This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8298cf8d4010 fix(query): Change start commit time to be exclusive in 
incremental query on Spark (#13982)
8298cf8d4010 is described below

commit 8298cf8d4010ab3b844a66fad315b4438db8af45
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Sep 26 16:48:11 2025 -0700

    fix(query): Change start commit time to be exclusive in incremental query 
on Spark (#13982)
---
 docker/demo/sparksql-incremental.commands          |  8 +--
 .../common/table/timeline/BaseHoodieTimeline.java  |  2 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  4 +-
 .../hudi/HoodieHadoopFsRelationFactory.scala       | 12 ++---
 .../hudi/MergeOnReadIncrementalRelationV2.scala    |  2 +-
 .../sql/hudi/streaming/HoodieStreamSourceV2.scala  |  4 +-
 .../hudi/functional/TestIncrementalQueries.java    |  8 ++-
 .../apache/hudi/functional/TestCOWDataSource.scala | 30 ++++--------
 .../hudi/functional/TestCOWDataSourceStorage.scala | 10 ++--
 .../functional/TestColumnStatsIndexWithSQL.scala   |  4 +-
 .../functional/TestDataSourceForBootstrap.scala    |  7 ++-
 .../TestIncrementalReadWithFullTableScan.scala     |  6 +--
 .../apache/hudi/functional/TestMORDataSource.scala | 57 +++++++++-------------
 .../hudi/functional/TestSparkDataSource.scala      |  5 +-
 .../hudi/functional/TestStructuredStreaming.scala  |  4 +-
 .../apache/spark/sql/hudi/common/TestSqlConf.scala |  4 +-
 .../dml/others/TestHoodieTableValuedFunction.scala |  6 +--
 .../sql/hudi/dml/others/TestMergeIntoTable.scala   |  6 +--
 .../sql/hudi/feature/TestCDCForSparkSQL.scala      | 45 ++++++++---------
 .../hudi/procedure/TestCopyToTableProcedure.scala  |  2 +-
 .../hudi/utilities/sources/HoodieIncrSource.java   | 17 ++++---
 21 files changed, 104 insertions(+), 139 deletions(-)

diff --git a/docker/demo/sparksql-incremental.commands 
b/docker/demo/sparksql-incremental.commands
index 8f58ef8e4caf..fa77582e8bfd 100644
--- a/docker/demo/sparksql-incremental.commands
+++ b/docker/demo/sparksql-incremental.commands
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 
 val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
 
-val startCompletionTime = HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_cow", "00000").get(1)
+val startCompletionTime = HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_cow", "00000").get(0)
 println("Begin completion time for COW incremental query: " + 
startCompletionTime)
 val hoodieIncQueryDF =  spark.read.format("hudi").
                       option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
@@ -38,7 +38,7 @@ println("stock_ticks_cow incremental count: " + 
hoodieIncQueryDF.count)
 hoodieIncQueryDF.registerTempTable("stock_ticks_cow_incr")
 spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from 
stock_ticks_cow_incr where  symbol = 'GOOG'").show(100, false);
 
-val bsStartCompletionTime = 
HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_cow_bs", "00000000000000").get(1)
+val bsStartCompletionTime = 
HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_cow_bs", "00000000000000").get(0)
 println("Begin completion time for bootstrap COW incremental query: " + 
bsStartCompletionTime)
 val hoodieIncQueryBsDF =  spark.read.format("hudi").
                       option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
@@ -48,7 +48,7 @@ println("stock_ticks_cow_bs incremental count: " + 
hoodieIncQueryBsDF.count)
 hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
 spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from 
stock_ticks_cow_bs_incr where  symbol = 'GOOG'").show(100, false);
 
-val morStartCompletionTime = 
HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_mor", "00000").get(1)
+val morStartCompletionTime = 
HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_mor", "00000").get(0)
 println("Begin completion time for MOR incremental query: " + 
morStartCompletionTime)
 val hoodieMorIncQueryDF =  spark.read.format("hudi").
                       option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
@@ -58,7 +58,7 @@ println("stock_ticks_mor incremental count: " + 
hoodieMorIncQueryDF.count)
 hoodieMorIncQueryDF.registerTempTable("stock_ticks_mor_incr")
 spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from 
stock_ticks_mor_incr where symbol = 'GOOG'").show(100, false);
 
-val bsMorStartCompletionTime = 
HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_mor_bs", "00000000000000").get(1)
+val bsMorStartCompletionTime = 
HoodieDataSourceHelpers.listCompletionTimeSince(fs, 
"/user/hive/warehouse/stock_ticks_mor_bs", "00000000000000").get(0)
 println("Begin completion time for bootstrap MOR incremental query: " + 
bsMorStartCompletionTime)
 val hoodieMorIncQueryBsDF =  spark.read.format("hudi").
                       option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
index 331db8713faa..cc559d111f3d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
@@ -263,7 +263,7 @@ public abstract class BaseHoodieTimeline implements 
HoodieTimeline {
   @Override
   public HoodieTimeline findInstantsInRangeByCompletionTime(String startTs, 
String endTs) {
     return factory.createDefaultTimeline(
-        getInstantsAsStream().filter(s -> s.getCompletionTime() != null && 
InstantComparison.isInClosedRange(s.getCompletionTime(), startTs, endTs)),
+        getInstantsAsStream().filter(s -> s.getCompletionTime() != null && 
InstantComparison.isInRange(s.getCompletionTime(), startTs, endTs)),
         getInstantReader());
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 54ca53794e34..fb78db78c51f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -345,8 +345,8 @@ object DefaultSource {
           case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
             (hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
               case (true, true) => new 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
-                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable, RangeType.CLOSED_CLOSED).build()
-              case (true, false) => new IncrementalRelationV2(sqlContext, 
parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
+                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+              case (true, false) => new IncrementalRelationV2(sqlContext, 
parameters, userSchema, metaClient, RangeType.OPEN_CLOSED)
               case (false, true) => new 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
                 sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
               case (false, false) => new IncrementalRelationV1(sqlContext, 
parameters, userSchema, metaClient)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 9221e4f0ff78..9ee1663fea65 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -20,21 +20,15 @@ package org.apache.hudi
 
 import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, 
isSchemaEvolutionEnabledOnRead}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.HoodieFileIndex.getConfigProperties
-import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, 
HoodieReaderConfig, TypedProperties}
-import 
org.apache.hudi.common.config.HoodieMetadataConfig.{DEFAULT_METADATA_ENABLE_FOR_READERS,
 ENABLE}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.log.InstantRange.RangeType
 import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.HoodieTableMetadataUtil
 import org.apache.hudi.storage.StoragePath
 
 import org.apache.avro.Schema
@@ -334,7 +328,7 @@ class 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(override val sqlCont
                                                             override val 
options: Map[String, String],
                                                             override val 
schemaSpec: Option[StructType],
                                                             isBootstrap: 
Boolean,
-                                                            rangeType: 
RangeType = RangeType.CLOSED_CLOSED)
+                                                            rangeType: 
RangeType = RangeType.OPEN_CLOSED)
   extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap,
     MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient, 
schemaSpec, None, rangeType))
 
@@ -434,7 +428,7 @@ class 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(override val sqlCont
                                                             override val 
options: Map[String, String],
                                                             override val 
schemaSpec: Option[StructType],
                                                             isBootstrap: 
Boolean,
-                                                            rangeType: 
RangeType)
+                                                            rangeType: 
RangeType = RangeType.OPEN_CLOSED)
   extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap,
     MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient, 
schemaSpec, None, rangeType))
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
index c2795325d688..f104f2cfc0f3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
@@ -52,7 +52,7 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
                                             override val metaClient: 
HoodieTableMetaClient,
                                             private val userSchema: 
Option[StructType],
                                             private val prunedDataSchema: 
Option[StructType] = None,
-                                            override val rangeType: RangeType 
= RangeType.CLOSED_CLOSED)
+                                            override val rangeType: RangeType 
= RangeType.OPEN_CLOSED)
   extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, 
Seq(), userSchema, prunedDataSchema)
     with HoodieIncrementalRelationV2Trait with MergeOnReadIncrementalRelation {
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index f5cb59cdf16b..3eebc4639179 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -170,7 +170,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
                 .map(serDe.serializeRow)
             case HoodieTableType.MERGE_ON_READ =>
               val requiredColumns = schema.fields.map(_.name)
-              new MergeOnReadIncrementalRelationV2(sqlContext, incParams, 
metaClient, Some(schema), rangeType = rangeType)
+              MergeOnReadIncrementalRelationV2(sqlContext, incParams, 
metaClient, Some(schema), rangeType = rangeType)
                 .buildScan(requiredColumns, Array.empty[Filter])
                 .asInstanceOf[RDD[InternalRow]]
             case _ => throw new IllegalArgumentException(s"UnSupport 
tableType: $tableType")
@@ -186,7 +186,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
         startOffset.offsetCommitTime, RangeType.CLOSED_CLOSED)
       case HoodieSourceOffset(completionTime) => (
         completionTime, RangeType.OPEN_CLOSED)
-      case _=> throw new IllegalStateException("UnKnow offset type.")
+      case _=> throw new IllegalStateException("Unknown offset type.")
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
index 0563e6364004..c150c97b6d03 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
@@ -75,11 +75,9 @@ class TestIncrementalQueries extends 
SparkClientFunctionalTestHarness {
     // Run incremental query for CASE 1: last two commits are within the range.
     // Make sure the records from the second commit are included.
     // This avoids the differences between different versions. That is,
-    // the range type of table version 6 is open_close, but that of > 6 is 
close_close by default.
-    String startTimestamp = 
String.valueOf(Long.valueOf(sortedInstants.get(1).getRight()) - 1);
-    Dataset<Row> result = spark().read().format("org.apache.hudi")
+    Dataset<Row> result = spark().read().format("hudi")
         .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
-        .option(DataSourceReadOptions.START_COMMIT().key(), startTimestamp)
+        .option(DataSourceReadOptions.START_COMMIT().key(), 
sortedInstants.get(0).getRight())
         .option(DataSourceReadOptions.END_COMMIT().key(), 
sortedInstants.get(2).getRight()).load(path);
     // Only records from the last two commits should be returned.
     assertEquals(3,
@@ -97,7 +95,7 @@ class TestIncrementalQueries extends 
SparkClientFunctionalTestHarness {
 
     // Run incremental query for CASE 2: start time is larger than the newest 
instant.
     // That is, no instances would fall into this range.
-    startTimestamp = 
String.valueOf(Long.valueOf(sortedInstants.get(2).getRight()) + 100);
+    String startTimestamp = 
String.valueOf(Long.valueOf(sortedInstants.get(2).getRight()) + 100);
     String endTimestamp = 
String.valueOf(Long.valueOf(sortedInstants.get(2).getRight()) + 200);
     result = spark().read().format("org.apache.hudi")
         .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 6f56f466bc9e..96d9685638a1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -637,6 +637,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .mode(SaveMode.Overwrite)
       .save(basePath)
     metaClient = createMetaClient(spark, basePath)
+    val commit1CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
 
     val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02"))
     val records2 = recordsToStrings(dataGen2.generateInserts("002", 
30)).asScala.toList
@@ -657,7 +658,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     val incrementalQueryRes = spark.read.format("hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+      .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
       .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
       .load(basePath)
     assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 
0)
@@ -1146,6 +1147,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Overwrite)
       .save(basePath)
+    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     val hoodieROViewDF1 = 
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
     assertEquals(insert1Cnt, hoodieROViewDF1.count())
 
@@ -1159,14 +1161,13 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
       .option(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "true")
       .mode(SaveMode.Append)
       .save(basePath)
-    val commitCompletionTime2 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     val hoodieROViewDF2 = 
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
     assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
 
     val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
       .load(basePath)
     assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
   }
@@ -1462,6 +1463,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
+    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     val countIn20160315 = records1.asScala.count(record => 
record.getPartitionPath == "2016/03/15")
     val pathForReader = getPathForReader(basePath, !enableFileIndex, if 
(partitionEncode) 1 else 3)
     // query the partition by filter
@@ -1492,13 +1494,12 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
       .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
       .mode(SaveMode.Append)
       .save(basePath)
-    val commitCompletionTime2 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
 
     // Incremental query without "*" in path
     val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
       .load(basePath)
     assertEquals(N + 1, hoodieIncViewDF1.count())
     assertEquals(false, Metrics.isInitialized(basePath))
@@ -2174,7 +2175,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     )
 
     // Helper method to get commit time based on table version
-    // v6 uses requestedTime, v9+ uses completionTime
+    // v6 uses requestedTime, v8+ uses completionTime
     def getCommitTime(tableVersion: String): String = {
       metaClient.reloadActiveTimeline()
       val lastInstant = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
@@ -2195,19 +2196,6 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
       getCommitTime(tableVersion)
     }
 
-    // Helper method to adjust start time for incremental queries based on 
version
-    // v6: open_close range (START exclusive, END inclusive)
-    // v9: close_close range (both START and END inclusive)
-    def getIncrementalStartTime(commitTime: String, tableVersion: String): 
String = {
-      if (tableVersion == "6") {
-        // v6: open_close - need to use time just before to include the commit
-        (commitTime.toLong - 1).toString
-      } else {
-        // v9: close_close - use the actual commit time
-        commitTime
-      }
-    }
-
     // Commit c1 - Initial Insert (10 records with timestamp 1000)
     val df1 = Seq(
       (1, "val1", 1000L, false),
@@ -2334,7 +2322,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     // - v9: Uses close_close range (both START and END inclusive)
     val incrementalDf1 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit1, tableVersion))
+      .option(DataSourceReadOptions.START_COMMIT.key, "000")
       .option(DataSourceReadOptions.END_COMMIT.key, commit3)
       .load(basePath)
       .select("id", "value", "timestamp")
@@ -2370,7 +2358,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     // Should include changes from c3, c4 and c5
     val incrementalDf2 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit3, tableVersion))
+      .option(DataSourceReadOptions.START_COMMIT.key, commit2)
       .load(basePath) // No END_COMMIT means up to latest
       .select("id", "value", "timestamp")
       .orderBy("id")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 2b73b1155d0b..8986a8ff55f6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -144,6 +144,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Append)
       .save(basePath)
 
+    val completionTime2 = DataSourceTestUtils.latestCommitCompletionTime(fs, 
basePath)
     val snapshotDF2 = spark.read.format("hudi")
       .options(readOptions)
       .load(basePath)
@@ -179,7 +180,6 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .save(basePath)
 
     val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-    val completionTime3 = DataSourceTestUtils.latestCommitCompletionTime(fs, 
basePath)
     assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size())
 
     // Snapshot Query
@@ -196,7 +196,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
       .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
+      .option(DataSourceReadOptions.START_COMMIT.key, "000")
       .option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
       .load(basePath)
     assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be 
pulled
@@ -226,7 +226,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
       .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
+      .option(DataSourceReadOptions.START_COMMIT.key, completionTime2)
       .load(basePath)
 
     assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must 
be pulled
@@ -238,7 +238,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
       .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
+      .option(DataSourceReadOptions.START_COMMIT.key, completionTime2)
       .option(DataSourceReadOptions.INCR_PATH_GLOB.key, if 
(isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
       .load(basePath)
     assertEquals(hoodieIncViewDF2
@@ -247,7 +247,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val timeTravelDF = spark.read.format("org.apache.hudi")
       .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
+      .option(DataSourceReadOptions.START_COMMIT.key, "000")
       .option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
       .load(basePath)
     assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be 
pulled
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 6e2304b09b58..7bc7acb98041 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -19,7 +19,7 @@
 package org.apache.hudi.functional
 
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieFileIndex}
-import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, 
ORDERING_FIELDS, RECORDKEY_FIELD}
+import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, 
RECORDKEY_FIELD}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
@@ -722,7 +722,7 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
     val numRecordsForSecondQueryWithDataSkipping = 
spark.sql(secondQuery).count()
 
     if 
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
-      createIncrementalSQLTable(commonOpts, 
metaClient.reloadActiveTimeline().getInstants.get(2).getCompletionTime)
+      createIncrementalSQLTable(commonOpts, 
metaClient.reloadActiveTimeline().getInstants.get(1).getCompletionTime)
       assertEquals(spark.sql(firstQuery).count(), if (isLastOperationDelete) 0 
else 3)
       assertEquals(spark.sql(secondQuery).count(), if (isLastOperationDelete) 
0 else 2)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 7fe676065af8..7d5ca083107e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.table.timeline.{HoodieInstantTimeGenerator, 
HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieWriteConfig}
 import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, 
sort}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -660,12 +660,11 @@ class TestDataSourceForBootstrap {
     assertEquals(1, countsPerCommit.length)
     assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
 
-    val startCompletionTime = 
HoodieInstantTimeGenerator.instantTimePlusMillis(bootstrapCommitCompletionTime, 
1)
     // incrementally pull only changes after bootstrap commit, which would 
pull only the updated records in the
     // later commits
     val hoodieIncViewDF2 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime)
+      .option(DataSourceReadOptions.START_COMMIT.key, 
bootstrapCommitCompletionTime)
       .load(basePath)
 
     assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
@@ -678,7 +677,7 @@ class TestDataSourceForBootstrap {
       // pull the update commits within certain partitions
       val hoodieIncViewDF3 = spark.read.format("hudi")
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, 
bootstrapCommitCompletionTime)
         .option(DataSourceReadOptions.INCR_PATH_GLOB.key, 
relativePartitionPath)
         .load(basePath)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 714a33a0a576..dc6a8b5d760d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -112,10 +112,10 @@ class TestIncrementalReadWithFullTableScan extends 
HoodieSparkClientTestBase {
     assertTrue(nArchivedInstants >= 3)
 
     //Anything less than 2 is a valid commit in the sense no cleanup has been 
done for those commit files
-    val startUnarchivedCompletionTs = 
completedCommits.nthInstant(1).get().getCompletionTime //C5 completion
+    val startUnarchivedCompletionTs = 
completedCommits.nthInstant(0).get().getCompletionTime //C5 completion
     val endUnarchivedCompletionTs = 
completedCommits.nthInstant(1).get().getCompletionTime //C5 completion
 
-    val startArchivedCompletionTs = 
archivedInstants(1).asInstanceOf[HoodieInstant].getCompletionTime //C1 
completion
+    val startArchivedCompletionTs = 
archivedInstants(0).asInstanceOf[HoodieInstant].getCompletionTime //C1 
completion
     val endArchivedCompletionTs = 
archivedInstants(1).asInstanceOf[HoodieInstant].getCompletionTime //C1 
completion
 
     val instant = Instant.now()
@@ -152,7 +152,7 @@ class TestIncrementalReadWithFullTableScan extends 
HoodieSparkClientTestBase {
 
     // Test both start commit and end commits is not archived and not cleaned
     val reversedCommits = completedCommits.getReverseOrderedInstants.toArray
-    val startUncleanedCompletionTs = 
reversedCommits.apply(0).asInstanceOf[HoodieInstant].getCompletionTime
+    val startUncleanedCompletionTs = 
reversedCommits.apply(1).asInstanceOf[HoodieInstant].getCompletionTime
     val endUncleanedCompletionTs = 
reversedCommits.apply(0).asInstanceOf[HoodieInstant].getCompletionTime
     runIncrementalQueryAndCompare(startUncleanedCompletionTs, 
endUncleanedCompletionTs, 1, true)
     runIncrementalQueryAndCompare(startUncleanedCompletionTs, 
endUncleanedCompletionTs, 1, false)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 03dcf79b3992..1948a49ebf8a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -262,7 +262,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       val hudiIncDF1 = spark.read.format("org.apache.hudi")
         .options(readOpts)
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, "000")
         .option(DataSourceReadOptions.END_COMMIT.key, commit1CompletionTime)
         .load(basePath)
       assertEquals(100, hudiIncDF1.count())
@@ -274,7 +274,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       val hudiIncDF2 = spark.read.format("org.apache.hudi")
         .options(readOpts)
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
         .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
         .load(basePath)
       assertEquals(100, hudiIncDF2.count())
@@ -286,7 +286,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       val hudiIncDF3 = spark.read.format("org.apache.hudi")
         .options(readOpts)
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, "000")
         .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
         .load(basePath)
       assertEquals(100, hudiIncDF3.count())
@@ -331,11 +331,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .mode(SaveMode.Append)
       .save(basePath)
     HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs, 
nonExistentConfigs)
-    val commit3CompletionTime = if 
(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
-      DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
-    } else {
-      DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
-    }
     val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -356,7 +351,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       val hudiIncDF4 = spark.read.format("org.apache.hudi")
         .options(readOpts)
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
         .load(basePath)
       assertEquals(50, hudiIncDF4.count())
 
@@ -365,7 +360,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
         .options(readOpts)
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, "000")
         .option(DataSourceReadOptions.REALTIME_MERGE.key, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
         .load(basePath)
       assertEquals(250, hudiIncDF4SkipMerge.count())
@@ -400,7 +395,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       val hudiIncDF5 = spark.read.format("org.apache.hudi")
         .options(readOpts)
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
         .load(basePath)
       assertEquals(150, hudiIncDF5.count())
     }
@@ -415,6 +410,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .options(writeOpts)
       .mode(SaveMode.Append)
       .save(basePath)
+    val commit5CompletionTime = if 
(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+      DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    } else {
+      DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+    }
     HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs, 
nonExistentConfigs)
     val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
       .options(readOpts)
@@ -448,7 +448,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       val hudiIncDF6 = spark.read.format("org.apache.hudi")
         .options(readOpts)
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commit6CompletionTime)
+        .option(DataSourceReadOptions.START_COMMIT.key, commit5CompletionTime)
         .option(DataSourceReadOptions.END_COMMIT.key, commit6CompletionTime)
         .load(basePath)
       // even though compaction updated 150 rows, since preserve commit 
metadata is true, they won't be part of incremental query.
@@ -510,6 +510,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
       .mode(SaveMode.Overwrite)
       .save(basePath)
+    val commit1CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
     val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
       .options(readOpts)
@@ -526,7 +527,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .options(writeOpts)
       .mode(SaveMode.Append)
       .save(basePath)
-    val commit2CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -550,7 +550,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     val hudiIncDF1 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+      .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
       .load(basePath)
     assertEquals(0, hudiIncDF1.count())
 
@@ -595,6 +595,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key, 
classOf[DefaultHoodieRecordPayload].getName)
       .mode(SaveMode.Overwrite)
       .save(basePath)
+    val commit1CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -618,7 +619,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .options(opts)
       .mode(SaveMode.Append)
       .save(basePath)
-    val commit2CompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -637,7 +637,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     val hudiIncDF2 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, if (tableVersion == 6) 
commit1Time else commit2CompletionTime)
+      .option(DataSourceReadOptions.START_COMMIT.key, if (tableVersion == 6) 
commit1Time else commit1CompletionTime)
       .load(basePath)
 
     // filter first commit and only read log records
@@ -905,12 +905,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
       .mode(SaveMode.Append)
       .save(basePath)
-    val commitCompletionTime2 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     // Incremental query without "*" in path
     val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
       .load(basePath)
     assertEquals(N + 1, hoodieIncViewDF1.count())
   }
@@ -1300,6 +1299,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .mode(SaveMode.Append)
       .save(basePath)
     val commit2Time = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+    val commit2CompletionTime = 
metaClient.getActiveTimeline.lastInstant().get().getCompletionTime
 
     val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003", 
20)).asScala.toSeq
     val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
@@ -1308,7 +1308,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .mode(SaveMode.Append)
       .save(basePath)
     val commit3Time = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-    val commit3CompletionTime = 
metaClient.reloadActiveTimeline.lastInstant().get().getCompletionTime
+    val commit3CompletionTime = 
metaClient.getActiveTimeline.lastInstant().get().getCompletionTime
 
     val pathForROQuery = getPathForROQuery(basePath, !enableFileIndex, 3)
     // snapshot query
@@ -1332,7 +1332,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     val incrementalQueryRes = spark.read.format("hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+      .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
       .option(DataSourceReadOptions.END_COMMIT.key, commit3CompletionTime)
       .load(basePath)
     assertEquals(0, incrementalQueryRes.where("partition = 
'2022-01-01'").count)
@@ -1980,19 +1980,6 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
       getCommitTime(tableVersion)
     }
 
-    // Helper method to adjust start time for incremental queries based on 
version
-    // v6: open_close range (START exclusive, END inclusive)
-    // v9: close_close range (both START and END inclusive)
-    def getIncrementalStartTime(commitTime: String, tableVersion: String): 
String = {
-      if (tableVersion == "6") {
-        // v6: open_close - need to use time just before to include the commit
-        (commitTime.toLong - 1).toString
-      } else {
-        // v9: close_close - use the actual commit time
-        commitTime
-      }
-    }
-
     // Commit c1 - Initial Insert (10 records with timestamp 1000)
     val df1 = Seq(
       (1, "val1", 1000L, false),
@@ -2119,7 +2106,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     // - v9: Uses close_close range (both START and END inclusive)
     val incrementalDf1 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit1, tableVersion))
+      .option(DataSourceReadOptions.START_COMMIT.key, "000")
       .option(DataSourceReadOptions.END_COMMIT.key, commit3)
       .load(basePath)
       .select("id", "value", "timestamp")
@@ -2155,7 +2142,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     // Should include changes from c3, c4 and c5
     val incrementalDf2 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit3, tableVersion))
+      .option(DataSourceReadOptions.START_COMMIT.key, commit2)
       .load(basePath) // No END_COMMIT means up to latest
       .select("id", "value", "timestamp")
       .orderBy("id")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 537469bffef5..123d23954efa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -113,6 +113,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Append)
       .save(basePath)
     val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+    val commitCompletionTime2 = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
 
     val snapshotDf2 = spark.read.format("org.apache.hudi")
       .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
@@ -147,7 +148,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
     val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").get(0)
     val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
+      .option(DataSourceReadOptions.START_COMMIT.key, "000")
       .option(DataSourceReadOptions.END_COMMIT.key, commitCompletionTime1)
       .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
       .load(basePath)
@@ -166,7 +167,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
     // another incremental query with commit2 and commit3
     val hoodieIncViewDf2 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime3)
+      .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
       .option(DataSourceReadOptions.END_COMMIT.key(), commitCompletionTime3)
       .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
       .load(basePath)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 834173ad3392..2ae305a8ccc0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -177,7 +177,7 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
       val firstCommit = HoodieDataSourceHelpers.listCommitsSince(storage, 
destPath, "000").get(0)
       val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
+        .option(DataSourceReadOptions.START_COMMIT.key, "000")
         .option(DataSourceReadOptions.END_COMMIT.key, commitCompletionTime1)
         .load(destPath)
       assertEquals(100, hoodieIncViewDF1.count())
@@ -189,7 +189,7 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
       // pull the latest commit
       val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
         .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+        .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
         .load(destPath)
 
       assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must 
be pulled
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
index 0bfaa580af72..0f0af4363cce 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
@@ -66,11 +66,11 @@ class TestSqlConf extends HoodieSparkSqlTestBase with 
BeforeAndAfter {
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 
$partitionVal)")
 
       val metaClient = createMetaClient(spark, tablePath)
+      val commitCompletionTime1 = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getCompletionTime
 
       // Then insert another new record
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 
$partitionVal)")
 
-      val commitCompletionTime2 = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getCompletionTime
       checkAnswer(s"select id, name, price, ts, year from $tableName")(
         Seq(1, "a1", 10.0, 1000, partitionVal),
         Seq(2, "a2", 10.0, 1000, partitionVal)
@@ -87,7 +87,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with 
BeforeAndAfter {
       // Manually pass incremental configs to global configs to make sure Hudi 
query is able to load the
       // global configs
       DFSPropertiesConfiguration.addToGlobalProps(QUERY_TYPE.key, 
QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      DFSPropertiesConfiguration.addToGlobalProps(START_COMMIT.key, 
commitCompletionTime2)
+      DFSPropertiesConfiguration.addToGlobalProps(START_COMMIT.key, 
commitCompletionTime1)
       spark.catalog.refreshTable(tableName)
       checkAnswer(s"select id, name, price, ts, year from $tableName")(
         Seq(2, "a2", 10.0, 1000, partitionVal)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
index bf4f4eb86d5c..ce1b7cc9849a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
@@ -129,6 +129,7 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
         )
 
         val fs = HadoopFSUtils.getFs(tablePath, 
spark.sessionState.newHadoopConf())
+        val firstCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
 
         checkAnswer(
           s"""select id,
@@ -150,7 +151,6 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
              | """.stripMargin
         )
         val secondCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
-        val secondInstant = spark.sql(s"select max(_hoodie_commit_time) as 
commitTime from  $tableName order by commitTime").first().getString(0)
 
         checkAnswer(
           s"""select id,
@@ -160,7 +160,7 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
              |from hudi_table_changes(
              |'$identifier',
              |'latest_state',
-             |'$secondCompletionTime')
+             |'$firstCompletionTime')
              |""".stripMargin
         )(
           Seq(1, "a1_1", 10.0, 1100),
@@ -184,7 +184,7 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
              | from hudi_table_changes(
              | '$identifier',
              | 'latest_state',
-             | '$secondCompletionTime',
+             | '$firstCompletionTime',
              | '$secondCompletionTime')
              | """.stripMargin
         )(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index 884d646d6aeb..243ebd93b414 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -1043,10 +1043,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         checkAnswer(s"select id, name, price, _ts from $targetTable")(
           Seq(1, "a1", 12, 1001)
         )
+        val secondCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
         // Test incremental query
         val hudiIncDF1 = spark.read.format("org.apache.hudi")
           .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-          .option(DataSourceReadOptions.START_COMMIT.key, firstCompletionTime)
+          .option(DataSourceReadOptions.START_COMMIT.key, "000")
           .option(DataSourceReadOptions.END_COMMIT.key, firstCompletionTime)
           .load(targetBasePath)
         hudiIncDF1.createOrReplaceTempView("inc1")
@@ -1067,11 +1068,10 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           Seq(1, "a1", 12, 1001),
           Seq(2, "a2", 10, 1001)
         )
-        val thirdCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
         // Test incremental query
         val hudiIncDF2 = spark.read.format("org.apache.hudi")
           .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-          .option(DataSourceReadOptions.START_COMMIT.key, thirdCompletionTime)
+          .option(DataSourceReadOptions.START_COMMIT.key, secondCompletionTime)
           .load(targetBasePath)
         hudiIncDF2.createOrReplaceTempView("inc2")
         checkAnswer(s"select id, name, price, _ts from inc2 order by id")(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
index e316559aa515..a6b3e1654ff1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
@@ -34,13 +34,13 @@ import org.junit.jupiter.api.Assertions.assertEquals
 
 class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
 
-  def cdcDataFrame(basePath: String, startingTs: Long, endingTs: Option[Long] 
= None): DataFrame = {
+  def cdcDataFrame(basePath: String, startingTs: String, endingTs: 
Option[String] = None): DataFrame = {
     val reader = spark.read.format("hudi")
       .option(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(INCREMENTAL_FORMAT.key, INCREMENTAL_FORMAT_CDC_VAL)
-      .option(START_COMMIT.key, startingTs.toString)
+      .option(START_COMMIT.key, startingTs)
     endingTs.foreach { ts =>
-      reader.option(END_COMMIT.key, ts.toString)
+      reader.option(END_COMMIT.key, ts)
     }
     reader.load(basePath)
   }
@@ -83,12 +83,12 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
         assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 2)
         val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName 
where id=1").head().get(0)
         val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-        val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+        val cdcDataOnly1 = cdcDataFrame(basePath, "000")
         cdcDataOnly1.show(false)
         assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
 
         spark.sql(s"delete from $tableName where id = 1")
-        val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+        val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
         assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
         assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 1)
         assert(!spark.sql(s"select _hoodie_file_name from 
$tableName").head().get(0).equals(fgForID1))
@@ -138,7 +138,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
 
           spark.sql(s"insert into $tableName values (1, 'a1', 11, 1000), (2, 
'a2', 12, 1000), (3, 'a3', 13, 1000)")
           val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          val cdcDataOnly1 = cdcDataFrame(basePath, "000")
           cdcDataOnly1.show(false)
           assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
 
@@ -146,7 +146,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
           val commitTime2 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
           // here we use `commitTime1` to query the change data in commit 2.
           // because `commitTime2` is maybe the ts of the compaction 
operation, not the write operation.
-          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
           cdcDataOnly2.show(false)
           assertCDCOpCnt(cdcDataOnly2, 0, 1, 0)
 
@@ -168,13 +168,13 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
 
           spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id 
= 2")
           val commitTime3 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong)
+          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2)
           cdcDataOnly3.show(false)
           assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
 
           spark.sql(s"delete from $tableName where id = 3")
           val commitTime4 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong)
+          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3)
           cdcDataOnly4.show(false)
           assertCDCOpCnt(cdcDataOnly4, 0, 0, 1)
 
@@ -192,8 +192,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                | when matched then update set id = s0.id, name = s0.name, 
price = s0.price, ts = s0.ts
                | when not matched then insert *
         """.stripMargin)
-          val commitTime5 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong)
+          val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4)
           cdcDataOnly5.show(false)
           assertCDCOpCnt(cdcDataOnly5, 1, 1, 0)
 
@@ -215,7 +214,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
             Seq("i", 4, null, null, "a4", 14)
           )
 
-          val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          val totalCdcData = cdcDataFrame(basePath, "000")
           assertCDCOpCnt(totalCdcData, 4, 3, 1)
         }
       }
@@ -266,19 +265,19 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                | (3, 'a3', 13, 1000, '2022')
         """.stripMargin)
           val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          val cdcDataOnly1 = cdcDataFrame(basePath, "000")
           cdcDataOnly1.show(false)
           assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
 
           spark.sql(s"insert overwrite table $tableName partition (pt = 
'2021') values (1, 'a1_v2', 11, 1100)")
           val commitTime2 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
           cdcDataOnly2.show(false)
           assertCDCOpCnt(cdcDataOnly2, 1, 0, 1)
 
           spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id 
= 2")
           val commitTime3 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2)
           cdcDataOnly3.show(false)
           assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
 
@@ -296,12 +295,11 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                | when matched then update set id = s0.id, name = s0.name, 
price = s0.price, ts = s0.ts, pt = s0.pt
                | when not matched then insert *
         """.stripMargin)
-          val commitTime4 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3)
           cdcDataOnly4.show(false)
           assertCDCOpCnt(cdcDataOnly4, 1, 1, 0)
 
-          val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          val totalCdcData = cdcDataFrame(basePath, "000")
           assertCDCOpCnt(totalCdcData, 5, 2, 1)
         }
       }
@@ -350,19 +348,19 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                | (3, 'a3', 13, 1000, '2022')
       """.stripMargin)
           val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          val cdcDataOnly1 = cdcDataFrame(basePath, "000")
           cdcDataOnly1.show(false)
           assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
 
           spark.sql(s"insert overwrite table $tableName partition (pt = 
'2021') values (1, 'a1_v2', 11, 1100)")
           val commitTime2 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
           cdcDataOnly2.show(false)
           assertCDCOpCnt(cdcDataOnly2, 1, 0, 1)
 
           spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id 
= 2")
           val commitTime3 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2)
           cdcDataOnly3.show(false)
           assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
 
@@ -377,12 +375,11 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                | on s0.id = $tableName.id
                | when matched then update set id = s0.id, name = s0.name, ts = 
s0.ts, pt = s0.pt
       """.stripMargin)
-          val commitTime4 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
-          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3)
           cdcDataOnly4.show(false)
           assertCDCOpCnt(cdcDataOnly4, 0, 1, 0)
 
-          val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          val totalCdcData = cdcDataFrame(basePath, "000")
           assertCDCOpCnt(totalCdcData, 4, 2, 1)
         }
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
index f6fcab7ca985..abf0b98ac2c3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
@@ -193,8 +193,8 @@ class TestCopyToTableProcedure extends 
HoodieSparkProcedureTestBase {
 
       // mark startCompletionTime
       val fs = HadoopFSUtils.getFs(tablePath, 
spark.sessionState.newHadoopConf())
-      spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
       val startCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+      spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
       spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
       val endCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 6e82a856a91a..a73e7412214c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
-import 
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import 
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
 import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext;
@@ -287,18 +287,19 @@ public class HoodieIncrSource extends RowSource {
           .filter(String.format("%s IN ('%s')", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
               String.join("','", instantTimeList)));
     } else {
-      // normal incremental query
-      TimelineLayout layout = 
TimelineLayout.fromVersion(queryContext.getActiveTimeline().getTimelineLayoutVersion());
-      String inclusiveStartCompletionTime = queryContext.getInstants().stream()
-          .min(layout.getInstantComparator().completionTimeOrderedComparator())
+      String exclusiveStartCompletionTime = 
analyzer.getStartCompletionTime().isPresent()
+          ? analyzer.getStartCompletionTime().get()
+          : String.valueOf(Long.parseLong(queryContext.getInstants().stream()
+          
.min(TimelineLayout.fromVersion(queryContext.getActiveTimeline().getTimelineLayoutVersion())
+              .getInstantComparator().completionTimeOrderedComparator())
           .map(HoodieInstant::getCompletionTime)
-          .get();
-
+          .get()) - 1);
+      // normal incremental query
       source = reader
           .options(readOpts)
           .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
           .option(INCREMENTAL_READ_TABLE_VERSION().key(), 
HoodieTableVersion.EIGHT.versionCode())
-          .option(START_COMMIT().key(), inclusiveStartCompletionTime)
+          .option(START_COMMIT().key(), exclusiveStartCompletionTime)
           .option(END_COMMIT().key(), endCompletionTime)
           .option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
               props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),

Reply via email to