This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cc067a10a9 [HUDI-8947] Fixing log file naming with MOR table writes 
using table version 6 (#12746)
8cc067a10a9 is described below

commit 8cc067a10a94e79875b80fcbdf15119cf3e6fb1e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jan 31 08:23:42 2025 -0800

    [HUDI-8947] Fixing log file naming with MOR table writes using table 
version 6 (#12746)
    
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  12 +-
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |   4 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  53 +++--
 .../providers/HoodieMetaClientProvider.java        |   3 +
 .../FlinkMergeAndReplaceHandleWithChangeLog.java   |   2 +-
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |   2 +-
 .../hudi/testutils/FunctionalTestHarness.java      |  11 +-
 .../SparkClientFunctionalTestHarness.java          |  35 ++-
 .../common/table/log/HoodieLogFormatWriter.java    |   3 +
 .../hudi/table/TestHoodieMergeOnReadTable.java     | 119 ++++++++++
 .../apache/hudi/testutils/DataSourceTestUtils.java |  11 +
 .../apache/hudi/functional/TestMORDataSource.scala | 240 +++++++++++++--------
 .../hudi/dml/TestMergeModeCommitTimeOrdering.scala |   3 +-
 13 files changed, 374 insertions(+), 124 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 107e0b62ef6..038963168ce 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -178,11 +178,12 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         .map(HoodieBaseFile::getCommitTime);
   }
 
-  private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat 
deltaWriteStat) {
+  private Option<FileSlice> populateWriteStatAndFetchFileSlice(HoodieRecord 
record, HoodieDeltaWriteStat deltaWriteStat) {
     HoodieTableVersion tableVersion = hoodieTable.version();
     String prevCommit;
     String baseFile = "";
     List<String> logFiles = new ArrayList<>();
+    Option<FileSlice> fileSlice = Option.empty();
 
     if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
       // table versions 8 and greater.
@@ -190,7 +191,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
         // the cdc reader needs the base file metadata to have deterministic 
update sequence.
         TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
-        Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
+        fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
         if (fileSlice.isPresent()) {
           prevCommit = fileSlice.get().getBaseInstantTime();
           baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
@@ -200,7 +201,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     } else {
       // older table versions.
       TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
-      Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
+      fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
       if (fileSlice.isPresent()) {
         prevCommit = fileSlice.get().getBaseInstantTime();
         baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
@@ -218,6 +219,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     deltaWriteStat.setPrevCommit(prevCommit);
     deltaWriteStat.setBaseFile(baseFile);
     deltaWriteStat.setLogFiles(logFiles);
+    return fileSlice;
   }
 
   private void init(HoodieRecord record) {
@@ -231,7 +233,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     writeStatus.setPartitionPath(partitionPath);
     deltaWriteStat.setPartitionPath(partitionPath);
     deltaWriteStat.setFileId(fileId);
-    populateWriteStat(record, deltaWriteStat);
+    Option<FileSlice> fileSliceOpt = 
populateWriteStatAndFetchFileSlice(record, deltaWriteStat);
     averageRecordSize = sizeEstimator.sizeEstimate(record);
     try {
       // Save hoodie partition meta in the partition path
@@ -243,7 +245,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
 
       String instantTime = 
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
           ? getInstantTimeForLogFile(record) : deltaWriteStat.getPrevCommit();
-      this.writer = createLogWriter(instantTime);
+      this.writer = createLogWriter(instantTime, fileSliceOpt);
     } catch (Exception e) {
       LOG.error("Error in update task at commit " + instantTime, e);
       writeStatus.setGlobalError(e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 08be71288f9..2f1b629096f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -62,7 +62,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMergeHandl
         partitionPath,
         storage,
         getWriterSchema(),
-        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX, 
Option.empty()),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
@@ -80,7 +80,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMergeHandl
         partitionPath,
         storage,
         getWriterSchema(),
-        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX, 
Option.empty()),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 73f05789083..0bda8706ec3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -30,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.LogFileCreationCallback;
 import org.apache.hudi.common.util.HoodieTimer;
@@ -234,25 +236,44 @@ public abstract class HoodieWriteHandle<T, I, K, O> 
extends HoodieIOHandle<T, I,
     return new Schema.Parser().parse(config.getWriteSchema());
   }
 
-  protected HoodieLogFormat.Writer createLogWriter(String instantTime) {
-    return createLogWriter(instantTime, null);
+  protected HoodieLogFormat.Writer createLogWriter(String instantTime, 
Option<FileSlice> fileSliceOpt) {
+    return createLogWriter(instantTime, null, fileSliceOpt);
   }
 
-  protected HoodieLogFormat.Writer createLogWriter(String instantTime, String 
fileSuffix) {
+  protected HoodieLogFormat.Writer createLogWriter(String instantTime, String 
fileSuffix, Option<FileSlice> fileSliceOpt) {
     try {
-      return HoodieLogFormat.newWriterBuilder()
-          
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
-          .withFileId(fileId)
-          .withInstantTime(instantTime)
-          .withFileSize(0L)
-          .withSizeThreshold(config.getLogFileMaxSize())
-          .withStorage(storage)
-          .withLogWriteToken(writeToken)
-          .withFileCreationCallback(getLogCreationCallback())
-          .withTableVersion(config.getWriteVersion())
-          .withSuffix(fileSuffix)
-          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-          .build();
+      if 
(config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+        return HoodieLogFormat.newWriterBuilder()
+            
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
+            .withFileId(fileId)
+            .withInstantTime(instantTime)
+            .withFileSize(0L)
+            .withSizeThreshold(config.getLogFileMaxSize())
+            .withStorage(storage)
+            .withLogWriteToken(writeToken)
+            .withFileCreationCallback(getLogCreationCallback())
+            .withTableVersion(config.getWriteVersion())
+            .withSuffix(fileSuffix)
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .build();
+      } else {
+        Option<HoodieLogFile> latestLogFile = fileSliceOpt.isPresent()
+            ? fileSliceOpt.get().getLatestLogFile()
+            : Option.empty();
+        return HoodieLogFormat.newWriterBuilder()
+            
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
+            .withFileId(fileId)
+            .withInstantTime(instantTime)
+            
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
+            
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
+            .withSizeThreshold(config.getLogFileMaxSize())
+            .withStorage(storage)
+            
.withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken))
+            .withSuffix(fileSuffix)
+            .withFileCreationCallback(getLogCreationCallback())
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .build();
+      }
     } catch (IOException e) {
       throw new HoodieException("Creating logger writer with fileId: " + 
fileId + ", "
           + "delta commit time: " + instantTime + ", "
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
index 23f0da3ce83..dec51f6b849 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.testutils.providers;
 
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -36,6 +37,8 @@ public interface HoodieMetaClientProvider {
 
   HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> 
storageConf, String basePath, Properties props) throws IOException;
 
+  HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> 
storageConf, String basePath, Properties props, HoodieTableType tableType) 
throws IOException;
+
   default HoodieTableFileSystemView getHoodieTableFileSystemView(
       HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
       List<StoragePathInfo> pathInfoList) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index c99e801b6ba..acb4ec224ba 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -64,7 +64,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, 
O>
         partitionPath,
         getStorage(),
         getWriterSchema(),
-        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX, 
Option.empty()),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 0483096ce9a..d04b3b9717e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -62,7 +62,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
         partitionPath,
         getStorage(),
         getWriterSchema(),
-        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX, 
Option.empty()),
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 79272b20aea..20c70a33aa8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -122,15 +123,21 @@ public class FunctionalTestHarness implements 
SparkProvider, DFSProvider, Hoodie
   }
 
   @Override
-  public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> 
storageConf, String basePath, Properties props) throws IOException {
+  public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> 
storageConf, String basePath, Properties props,
+      HoodieTableType tableType) throws IOException {
     return HoodieTableMetaClient.newTableBuilder()
       .setTableName(RAW_TRIPS_TEST_NAME)
-      .setTableType(COPY_ON_WRITE)
+      .setTableType(tableType)
       .setPayloadClass(HoodieAvroPayload.class)
       .fromProperties(props)
       .initTable(storageConf.newInstance(), basePath);
   }
 
+  @Override
+  public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> 
storageConf, String basePath, Properties props) throws IOException {
+    return getHoodieMetaClient(storageConf, basePath, props, COPY_ON_WRITE);
+  }
+
   @Override
   public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) 
throws IOException {
     return new SparkRDDWriteClient(context(), cfg);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index a24710ce201..aba8573073c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -79,10 +79,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -192,9 +194,15 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
 
   @Override
   public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> 
storageConf, String basePath, Properties props) throws IOException {
+    return getHoodieMetaClient(storageConf, basePath, props, COPY_ON_WRITE);
+  }
+
+  @Override
+  public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> 
storageConf, String basePath, Properties props,
+                                                   HoodieTableType tableType) 
throws IOException {
     return HoodieTableMetaClient.newTableBuilder()
         .setTableName(RAW_TRIPS_TEST_NAME)
-        .setTableType(COPY_ON_WRITE)
+        .setTableType(tableType)
         .setPayloadClass(HoodieAvroPayload.class)
         .setTableVersion(ConfigUtils.getIntWithAltKeys(new 
TypedProperties(props), WRITE_TABLE_VERSION))
         .fromProperties(props)
@@ -430,4 +438,29 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
     timelineServicePort = (timelineServicePort + 1 - 1024) % (65536 - 1024) + 
1024;
     return timelineServicePort;
   }
+
+  /**
+   * Check if two dataframes are equal.
+   *
+   * @param expectedDf      expected dataframe
+   * @param actualDf        actual dataframe
+   * @param validateColumns columns to validate
+   * @return true if dataframes are equal, false otherwise
+   */
+  public static boolean areDataframesEqual(Dataset<Row> expectedDf, 
Dataset<Row> actualDf, Set<String> validateColumns) {
+    // Normalize schema order
+    String[] sortedColumnNames = Arrays.stream(expectedDf.columns())
+        .filter(validateColumns::contains).sorted().toArray(String[]::new);
+
+    // Reorder columns in both datasets
+    Dataset<Row> df1Normalized = expectedDf.selectExpr(sortedColumnNames);
+    Dataset<Row> df2Normalized = actualDf.selectExpr(sortedColumnNames);
+
+    // Sort rows
+    Dataset<Row> df1Sorted = df1Normalized.sort("_row_key");
+    Dataset<Row> df2Sorted = df2Normalized.sort("_row_key");
+
+    // Check for differences
+    return df1Sorted.except(df2Sorted).isEmpty() && 
df2Sorted.except(df1Sorted).isEmpty();
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 5c4c141947d..e2c75aa9367 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -102,6 +102,9 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
       boolean created = false;
       while (!created) {
         try {
+          if (storage.exists(logFile.getPath())) {
+            rollOver();
+          }
           // Block size does not matter as we will always manually auto-flush
           createNewFile();
           LOG.info("Created a new log file: {}", logFile);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 34841ba2059..006a1bd80c9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -24,9 +24,11 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -63,6 +65,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -74,6 +77,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +89,8 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static 
org.apache.hudi.testutils.HoodieSparkClientTestHarness.buildProfile;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -222,6 +228,119 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
     }
   }
 
+  @Test
+  public void testUpsertPartitionerWithTableVersionSix() throws Exception {
+    HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
+    addConfigsForPopulateMetaFields(cfgBuilder, true);
+    cfgBuilder.withWriteTableVersion(6);
+    HoodieWriteConfig cfg = cfgBuilder.build();
+
+    // create meta client w/ the table version 6
+    Properties props = getPropertiesForKeyGen(true);
+    props.put(WRITE_TABLE_VERSION.key(), "6");
+    metaClient = getHoodieMetaClient(storageConf(), basePath(), props, 
HoodieTableType.MERGE_ON_READ);
+    dataGen = new HoodieTestDataGenerator();
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+      // batch 1 insert
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+      List<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), 
metaClient);
+
+      Option<HoodieInstant> deltaCommit = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
+      assertTrue(deltaCommit.isPresent());
+      assertEquals("001", deltaCommit.get().requestedTime(), "Delta commit 
should be 001");
+
+      Option<HoodieInstant> commit = 
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
+      assertFalse(commit.isPresent());
+
+      List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
+      BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
+          metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
+
+      Map<String, String> baseFileMapping = new HashMap<>();
+      Map<String, List<String>> baseFileToLogFileMapping = new HashMap<>();
+      BaseFileOnlyView finalRoView = roView;
+      
Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).forEach(partitionPath
 -> {
+        String baseFileName = 
finalRoView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()).get(0).getFileName();
+        baseFileMapping.put(partitionPath, baseFileName);
+        baseFileToLogFileMapping.put(baseFileName, new ArrayList<>());
+      });
+
+      writeAndValidateLogFileBaseInstantTimeMatches(client, "002", records, 
cfg, baseFileMapping, baseFileToLogFileMapping);
+      writeAndValidateLogFileBaseInstantTimeMatches(client, "003", records, 
cfg, baseFileMapping, baseFileToLogFileMapping);
+      writeAndValidateLogFileBaseInstantTimeMatches(client, "004", records, 
cfg, baseFileMapping, baseFileToLogFileMapping);
+    }
+  }
+
+  private void 
writeAndValidateLogFileBaseInstantTimeMatches(SparkRDDWriteClient client, 
String newCommitTime, List<HoodieRecord> records,
+                                                             HoodieWriteConfig 
cfg, Map<String, String> baseFileMapping,
+                                                             Map<String, 
List<String>> baseFileToLogFileMapping) throws IOException {
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> newRecords = dataGen.generateUpdates(newCommitTime, 
records);
+    List<WriteStatus> statuses = client.upsert(jsc().parallelize(newRecords), 
newCommitTime).collect();
+    // validate the data itself
+    validateNewData(newRecords);
+    assertNoWriteErrors(statuses);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    Option<HoodieInstant> deltaCommit = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
+    assertTrue(deltaCommit.isPresent());
+    assertEquals(newCommitTime, deltaCommit.get().requestedTime(), "Latest 
Delta commit should be 002");
+
+    HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), 
metaClient);
+    HoodieTable finalHoodieTable = hoodieTable;
+    baseFileMapping.entrySet().forEach(entry -> {
+          FileSlice fileSlice = 
finalHoodieTable.getSliceView().getLatestFileSlices(entry.getKey()).collect(Collectors.toList()).get(0);
+          String baseFileName = entry.getValue();
+          String baseInstantTime = FSUtils.getCommitTime(baseFileName);
+          // validate the base instant time matches
+          List<HoodieLogFile> logFiles = 
fileSlice.getLogFiles().collect(Collectors.toList());
+          // except latest log file, all other files should be present in the 
tracking map.
+          int counter = 0;
+          while (counter < logFiles.size()) {
+            HoodieLogFile logFile = logFiles.get(counter);
+            if (counter == logFiles.size() - 1) {
+              // latest log file may not be present in the tracking map. lets 
add it to assist w/ for next round of validation.
+              
baseFileToLogFileMapping.get(baseFileName).add(logFile.getFileName());
+            } else {
+              // all previous log files are expected to be matching
+              
baseFileToLogFileMapping.get(baseFileName).contains(logFile.getFileName());
+            }
+            // validate that base instant time matches
+            assertEquals(baseInstantTime, 
FSUtils.getDeltaCommitTimeFromLogPath(logFile.getPath()));
+            counter++;
+          }
+        }
+    );
+  }
+
+  private void validateNewData(List<HoodieRecord> newRecords) {
+    Dataset<Row> inputDf = 
spark().read().json(jsc().parallelize(recordsToStrings(newRecords), 
2)).drop("partition");
+    // get keys from the dataframe
+    List<String> updatedKeys = 
inputDf.select("_row_key").as(Encoders.STRING()).collectAsList();
+    Dataset<Row> outputDf = spark().read().format("hudi").load(basePath());
+    // drop metadata columns
+    outputDf = outputDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+        HoodieRecord.FILENAME_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
+    // filter the dataframe for updatedKeys only
+    outputDf = 
outputDf.filter(outputDf.col("_row_key").isin(updatedKeys.toArray()));
+    // assert that the dataframe is equal to the expected dataframe
+    // NOTE: we have excluded some columns from comparison such as map, date 
and array type fields as they were incompatible
+    // For example below is what data generated looks like vs what is read 
from the table (check `city_to_state` map: [CA] vs Map(LA -> CA))
+    //  
[false,029c1e56-3c03-42e3-a2eb-a45addd5b671,0.5550830309956531,0.013823731501093062,[CA],15,1322460250,1053705246,driver-002,0.8563083971473885,0.7050871729430999,
+    //         [39.649862113946796,USD], WrappedArray(0, 0, 8, 19, 
-72),Canada,2015/03/17,rider-002,-5190452608208752867,0,WrappedArray([88.29247239885966,USD]),BLACK,0.7458226]
+    //  
[false,029c1e56-3c03-42e3-a2eb-a45addd5b671,0.5550830309956531,0.013823731501093062,Map(LA
 -> 
CA),1970-01-16,1322460250,1053705246,driver-002,0.8563083971473885,0.7050871729430999,
+    //         
[39.649862113946796,USD],0.529336,[B@372d7420,2015/03/17,rider-002,-5190452608208752867,0,WrappedArray([88.29247239885966,USD]),BLACK,0.7458226]
+    assertTrue(areDataframesEqual(inputDf, outputDf, new 
HashSet<>(Arrays.asList("_hoodie_is_deleted", "_row_key", "begin_lat", 
"begin_lon",
+        "current_ts", "distance_in_meters", "driver", "end_lat", "end_lon", 
"fare"))), "Dataframe mismatch");
+  }
+
   // TODO: Enable metadata virtual keys in this test once the feature 
HUDI-2593 is completed
   @Test
   public void testLogFileCountsAfterCompaction() throws Exception {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 7c1b596643c..08a82284898 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -172,9 +172,20 @@ public class DataSourceTestUtils {
         .getLatestCompletionTime().orElse(null);
   }
 
+  public static String latestCommitRequestTime(HoodieStorage storage, String 
basePath) {
+    return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage, 
basePath)
+        .lastInstant().map(instant -> instant.requestedTime()).orElse(null);
+  }
+
   public static String latestDeltaCommitCompletionTime(HoodieStorage storage, 
String basePath) {
     return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage, 
basePath)
         .filter(instant -> 
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
         .getLatestCompletionTime().orElse(null);
   }
+
+  public static String latestDeltaCommitRequest(HoodieStorage storage, String 
basePath) {
+    return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage, 
basePath)
+        .filter(instant -> 
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
+        .lastInstant().map(instant -> instant.requestedTime()).orElse(null);
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 3de88e30284..30a32bdb814 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.config.{HoodieMemoryConfig, 
HoodieMetadataConfig,
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util.Option
@@ -33,7 +33,7 @@ import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
 import org.apache.hudi.index.HoodieIndex.IndexType
-import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.{metadataPartitionExists, 
PARTITION_NAME_SECONDARY_INDEX_PREFIX}
+import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.{PARTITION_NAME_SECONDARY_INDEX_PREFIX,
 metadataPartitionExists}
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
 import org.apache.hudi.testutils.{DataSourceTestUtils, 
HoodieSparkClientTestBase}
@@ -99,19 +99,22 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
   @ParameterizedTest
   @CsvSource(Array(
     // Inferred as COMMIT_TIME_ORDERING
-    "AVRO, AVRO, avro, false,,", "AVRO, SPARK, parquet, false,,",
-    "SPARK, AVRO, parquet, false,,", "SPARK, SPARK, parquet, false,,",
+    "AVRO, AVRO, avro, false,,,EIGHT", "AVRO, SPARK, parquet, false,,,EIGHT",
+    "SPARK, AVRO, parquet, false,,,EIGHT", "SPARK, SPARK, parquet, 
false,,,EIGHT",
     // EVENT_TIME_ORDERING without precombine field
-    "AVRO, AVRO, avro, false,,EVENT_TIME_ORDERING", "AVRO, SPARK, parquet, 
false,,EVENT_TIME_ORDERING",
-    "SPARK, AVRO, parquet, false,,EVENT_TIME_ORDERING", "SPARK, SPARK, 
parquet, false,,EVENT_TIME_ORDERING",
+    "AVRO, AVRO, avro, false,,EVENT_TIME_ORDERING,EIGHT", "AVRO, SPARK, 
parquet, false,,EVENT_TIME_ORDERING,EIGHT",
+    "SPARK, AVRO, parquet, false,,EVENT_TIME_ORDERING,EIGHT", "SPARK, SPARK, 
parquet, false,,EVENT_TIME_ORDERING,EIGHT",
     // EVENT_TIME_ORDERING with empty precombine field
-    "AVRO, AVRO, avro, true,,EVENT_TIME_ORDERING", "AVRO, SPARK, parquet, 
true,,EVENT_TIME_ORDERING",
-    "SPARK, AVRO, parquet, true,,EVENT_TIME_ORDERING", "SPARK, SPARK, parquet, 
true,,EVENT_TIME_ORDERING",
+    "AVRO, AVRO, avro, true,,EVENT_TIME_ORDERING,EIGHT", "AVRO, SPARK, 
parquet, true,,EVENT_TIME_ORDERING,EIGHT",
+    "SPARK, AVRO, parquet, true,,EVENT_TIME_ORDERING,EIGHT", "SPARK, SPARK, 
parquet, true,,EVENT_TIME_ORDERING,EIGHT",
     // Inferred as EVENT_TIME_ORDERING
-    "AVRO, AVRO, avro, true, timestamp,", "AVRO, SPARK, parquet, true, 
timestamp,",
-    "SPARK, AVRO, parquet, true, timestamp,", "SPARK, SPARK, parquet, true, 
timestamp,"))
+    "AVRO, AVRO, avro, true, timestamp,,EIGHT", "AVRO, SPARK, parquet, true, 
timestamp,,EIGHT",
+    "SPARK, AVRO, parquet, true, timestamp,,EIGHT", "SPARK, SPARK, parquet, 
true, timestamp,,EIGHT",
+    // test table version 6
+    "AVRO, AVRO, avro, true,timestamp,EVENT_TIME_ORDERING,SIX",
+    "AVRO, AVRO, avro, true,timestamp,COMMIT_TIME_ORDERING,SIX"))
   def testCount(readType: HoodieRecordType, writeType: HoodieRecordType, 
logType: String,
-                hasPreCombineField: Boolean, precombineField: String, 
recordMergeMode: String) {
+                hasPreCombineField: Boolean, precombineField: String, 
recordMergeMode: String, tableVersion: String): Unit = {
     var (_, readOpts) = getWriterReaderOpts(readType)
     var (writeOpts, _) = getWriterReaderOpts(writeType)
     readOpts = readOpts ++ 
Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logType)
@@ -123,14 +126,24 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       writeOpts = writeOpts ++ Map(DataSourceWriteOptions.PRECOMBINE_FIELD.key 
->
         (if (isNullOrEmpty(precombineField)) "" else precombineField))
     }
-    val firstWriteOpts = if (isNullOrEmpty(recordMergeMode)) {
-      writeOpts
-    } else {
-      writeOpts ++ Map(HoodieWriteConfig.RECORD_MERGE_MODE.key -> 
recordMergeMode)
+    if (!isNullOrEmpty(recordMergeMode)) {
+      writeOpts = writeOpts ++ Map(HoodieWriteConfig.RECORD_MERGE_MODE.key -> 
recordMergeMode)
     }
     if (isNullOrEmpty(recordMergeMode)) {
       assertFalse(writeOpts.contains(HoodieWriteConfig.RECORD_MERGE_MODE.key))
     }
+    val firstWriteOpts = if (tableVersion.equals("SIX")) {
+      writeOpts = writeOpts ++ Map(HoodieWriteConfig.WRITE_TABLE_VERSION.key() 
-> HoodieTableVersion.SIX.versionCode().toString)
+      writeOpts = writeOpts - HoodieWriteConfig.RECORD_MERGE_MODE.key
+      if (recordMergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING.name)) {
+        writeOpts = writeOpts ++ 
Map(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName)
+      } else if 
(recordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING.name)) {
+        writeOpts = writeOpts ++ 
Map(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName)
+      }
+      writeOpts
+    } else {
+      writeOpts
+    }
 
     // First Operation:
     // Producing parquet files to three default partitions.
@@ -155,7 +168,13 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     } else {
       recordMergeMode
     }
-    val expectedConfigs = (Map(HoodieTableConfig.RECORD_MERGE_MODE.key -> 
expectedMergeMode) ++
+    val expectedConfigs = (
+      if (tableVersion.equals("EIGHT")) {
+        Map(HoodieTableConfig.RECORD_MERGE_MODE.key -> expectedMergeMode,
+          HoodieTableConfig.VERSION.key -> 
HoodieTableVersion.EIGHT.versionCode().toString)
+      } else {
+        Map(HoodieTableConfig.VERSION.key -> 
HoodieTableVersion.SIX.versionCode().toString)
+      } ++
       (if (hasPreCombineField && !isNullOrEmpty(precombineField)) {
         Map(HoodieTableConfig.PRECOMBINE_FIELD.key -> precombineField)
       } else {
@@ -167,7 +186,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       Seq(HoodieTableConfig.PRECOMBINE_FIELD.key)
     }).asJava
     HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs, 
nonExistentConfigs)
-    val commit1CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    val commit1CompletionTime = if (tableVersion.equals("EIGHT")) {
+      DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    } else {
+      DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+    }
     val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -185,7 +208,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .mode(SaveMode.Append)
       .save(basePath)
     HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs, 
nonExistentConfigs)
-    val commit2CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    val commit2CompletionTime = if (tableVersion.equals("EIGHT")) {
+      DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    } else {
+      DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+    }
     val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -198,49 +225,54 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, 
Seq("_hoodie_record_key"), "left").count())
 
     // incremental view
-    // base file only
-    val hudiIncDF1 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
-      .option(DataSourceReadOptions.END_COMMIT.key, commit1CompletionTime)
-      .load(basePath)
-    assertEquals(100, hudiIncDF1.count())
-    assertEquals(1, 
hudiIncDF1.select("_hoodie_commit_time").distinct().count())
-    assertEquals(commit1Time, 
hudiIncDF1.select("_hoodie_commit_time").head().get(0).toString)
-    hudiIncDF1.show(1)
-    // log file only
-    val hudiIncDF2 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
-      .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
-      .load(basePath)
-    assertEquals(100, hudiIncDF2.count())
-    assertEquals(1, 
hudiIncDF2.select("_hoodie_commit_time").distinct().count())
-    assertEquals(commit2Time, 
hudiIncDF2.select("_hoodie_commit_time").head().get(0).toString)
-    hudiIncDF2.show(1)
-
-    // base file + log file
-    val hudiIncDF3 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
-      .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
-      .load(basePath)
-    assertEquals(100, hudiIncDF3.count())
-    // log file being load
-    assertEquals(1, 
hudiIncDF3.select("_hoodie_commit_time").distinct().count())
-    assertEquals(commit2Time, 
hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString)
-
-    // Test incremental query has no instant in range
-    val emptyIncDF = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, "000")
-      .option(DataSourceReadOptions.END_COMMIT.key, "001")
-      .load(basePath)
-    assertEquals(0, emptyIncDF.count())
+    // validate incremental queries only for table version 8
+    // 1.0 reader (table version 8) supports incremental query reads using 
completion time
+    if (tableVersion.equals("EIGHT")) {
+      // base file only
+      val hudiIncDF1 = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+        .option(DataSourceReadOptions.END_COMMIT.key, commit1CompletionTime)
+        .load(basePath)
+      assertEquals(100, hudiIncDF1.count())
+      assertEquals(1, 
hudiIncDF1.select("_hoodie_commit_time").distinct().count())
+      assertEquals(commit1Time, 
hudiIncDF1.select("_hoodie_commit_time").head().get(0).toString)
+      hudiIncDF1.show(1)
+
+      // log file only
+      val hudiIncDF2 = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+        .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
+        .load(basePath)
+      assertEquals(100, hudiIncDF2.count())
+      assertEquals(1, 
hudiIncDF2.select("_hoodie_commit_time").distinct().count())
+      assertEquals(commit2Time, 
hudiIncDF2.select("_hoodie_commit_time").head().get(0).toString)
+      hudiIncDF2.show(1)
+
+      // base file + log file
+      val hudiIncDF3 = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+        .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
+        .load(basePath)
+      assertEquals(100, hudiIncDF3.count())
+      // log file being load
+      assertEquals(1, 
hudiIncDF3.select("_hoodie_commit_time").distinct().count())
+      assertEquals(commit2Time, 
hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString)
+
+      // Test incremental query has no instant in range
+      val emptyIncDF = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, "000")
+        .option(DataSourceReadOptions.END_COMMIT.key, "001")
+        .load(basePath)
+      assertEquals(0, emptyIncDF.count())
+    }
 
     // Unmerge
     val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi")
@@ -269,7 +301,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .mode(SaveMode.Append)
       .save(basePath)
     HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs, 
nonExistentConfigs)
-    val commit3CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    val commit3CompletionTime = if (tableVersion.equals("EIGHT")) {
+      DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    } else {
+      DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+    }
     val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -284,22 +320,26 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key", 
"_hoodie_commit_time"), "inner").count())
 
     // incremental query from commit2Time
-    val hudiIncDF4 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
-      .load(basePath)
-    assertEquals(50, hudiIncDF4.count())
-
-    // skip merge incremental view
-    // including commit 2 and commit 3
-    val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
-      .option(DataSourceReadOptions.REALTIME_MERGE.key, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
-      .load(basePath)
-    assertEquals(250, hudiIncDF4SkipMerge.count())
+    // validate incremental queries only for table version 8
+    // 1.0 reader (table version 8) supports incremental query reads using 
completion time
+    if (tableVersion.equals("EIGHT")) {
+      val hudiIncDF4 = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+        .load(basePath)
+      assertEquals(50, hudiIncDF4.count())
+
+      // skip merge incremental view
+      // including commit 2 and commit 3
+      val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+        .option(DataSourceReadOptions.REALTIME_MERGE.key, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+        .load(basePath)
+      assertEquals(250, hudiIncDF4SkipMerge.count())
+    }
 
     // Fourth Operation:
     // Insert records to a new partition. Produced a new parquet file.
@@ -324,12 +364,16 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), 
"inner").count())
 
     // Incremental query, 50 from log file, 100 from base file of the new 
partition.
-    val hudiIncDF5 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
-      .load(basePath)
-    assertEquals(150, hudiIncDF5.count())
+    // validate incremental queries only for table version 8
+    // 1.0 reader (table version 8) supports incremental query reads using 
completion time
+    if (tableVersion.equals("EIGHT")) {
+      val hudiIncDF5 = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+        .load(basePath)
+      assertEquals(150, hudiIncDF5.count())
+    }
 
     // Fifth Operation:
     // Upsert records to the new partition. Produced a newer version of 
parquet file.
@@ -358,21 +402,29 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .mode(SaveMode.Append)
       .save(basePath)
     HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs, 
nonExistentConfigs)
-    val commit6CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    val commit6CompletionTime = if (tableVersion.equals("EIGHT")) {
+      DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    } else {
+      DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+    }
     val hudiSnapshotDF6 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
       .load(basePath + "/2020/01/10/*")
     assertEquals(102, hudiSnapshotDF6.count())
-    val hudiIncDF6 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit6CompletionTime)
-      .option(DataSourceReadOptions.END_COMMIT.key, commit6CompletionTime)
-      .load(basePath)
-    // even though compaction updated 150 rows, since preserve commit metadata 
is true, they won't be part of incremental query.
-    // inserted 2 new row
-    assertEquals(2, hudiIncDF6.count())
+    // validate incremental queries only for table version 8
+    // 1.0 reader (table version 8) supports incremental query reads using 
completion time
+    if (tableVersion.equals("EIGHT")) {
+      val hudiIncDF6 = spark.read.format("org.apache.hudi")
+        .options(readOpts)
+        .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit6CompletionTime)
+        .option(DataSourceReadOptions.END_COMMIT.key, commit6CompletionTime)
+        .load(basePath)
+      // even though compaction updated 150 rows, since preserve commit 
metadata is true, they won't be part of incremental query.
+      // inserted 2 new row
+      assertEquals(2, hudiIncDF6.count())
+    }
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
index 67e7d992b19..5161e622a7c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
@@ -28,12 +28,11 @@ import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConf
 
 class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
 
-  // TODO(HUDI-8938): add "mor,true,true,6" after the fix
   Seq(
     "cow,8,false,false", "cow,8,false,true", "cow,8,true,false",
     "cow,6,true,false", "cow,6,true,true",
     "mor,8,false,false", "mor,8,false,true", "mor,8,true,false",
-    "mor,6,true,false").foreach { args =>
+    "mor,6,true,true").foreach { args =>
     val argList = args.split(',')
     val tableType = argList(0)
     val tableVersion = argList(1)

Reply via email to