This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e0513d1585a [MINOR] Incremental and Time Travel Query Testing (#13934)
1e0513d1585a is described below

commit 1e0513d1585aa1b39939770909733dbbf868451d
Author: Rahil C <[email protected]>
AuthorDate: Sat Sep 20 07:15:24 2025 -0700

    [MINOR] Incremental and Time Travel Query Testing (#13934)
---
 .../apache/hudi/functional/TestCOWDataSource.scala | 283 +++++++++++++++++++++
 .../apache/hudi/functional/TestMORDataSource.scala | 268 +++++++++++++++++++
 2 files changed, 551 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 0058bac9937d..6f56f466bc9e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -2141,6 +2141,289 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     assertEquals(count, 0)
   }
 
+  /**
+   * Test incremental queries and time travel queries with event time ordering.
+   *
+   * This test validates:
+   * 1. Event time ordering behavior (updates with lower timestamps are 
ignored)
+   * 2. Delete operations using _hoodie_is_deleted column
+   * 3. Time travel queries showing correct historical state
+   * 4. Incremental queries returning changes within specified commit ranges
+   * 5. Version-specific behavior differences between v6 and v9 tables
+   *
+   * Key version differences:
+   * - v6: Uses requestedTime for commits, open_close incremental ranges
+   * - v9: Uses completionTime for commits, close_close incremental ranges
+   */
+  @ParameterizedTest
+  @CsvSource(Array("6", "9"))
+  def testIncrementalAndTimeTravelWithEventTimeOrdering(tableVersion: String): 
Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    // Configuration with event time ordering enabled
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp", // Required for 
event time ordering
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> tableVersion,
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "" // non-partitioned
+    )
+
+    // Helper method to get commit time based on table version
+    // v6 uses requestedTime, v9+ uses completionTime
+    def getCommitTime(tableVersion: String): String = {
+      metaClient.reloadActiveTimeline()
+      val lastInstant = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
+      if (tableVersion == "6") {
+        lastInstant.requestedTime
+      } else {
+        lastInstant.getCompletionTime
+      }
+    }
+
+    // Helper method to write data and return version-appropriate commit time
+    def writeBatch(df: DataFrame, opts: Map[String, String], tableVersion: 
String, mode: SaveMode = SaveMode.Append): String = {
+      df.write.format("hudi").options(opts).mode(mode).save(basePath)
+      // Initialize metaClient after first write if it doesn't exist
+      if (metaClient == null) {
+        metaClient = createMetaClient(spark, basePath)
+      }
+      getCommitTime(tableVersion)
+    }
+
+    // Helper method to adjust start time for incremental queries based on 
version
+    // v6: open_close range (START exclusive, END inclusive)
+    // v9: close_close range (both START and END inclusive)
+    def getIncrementalStartTime(commitTime: String, tableVersion: String): 
String = {
+      if (tableVersion == "6") {
+        // v6: open_close - need to use time just before to include the commit
+        (commitTime.toLong - 1).toString
+      } else {
+        // v9: close_close - use the actual commit time
+        commitTime
+      }
+    }
+
+    // Commit c1 - Initial Insert (10 records with timestamp 1000)
+    val df1 = Seq(
+      (1, "val1", 1000L, false),
+      (2, "val2", 1000L, false),
+      (3, "val3", 1000L, false),
+      (4, "val4", 1000L, false),
+      (5, "val5", 1000L, false),
+      (6, "val6", 1000L, false),
+      (7, "val7", 1000L, false),
+      (8, "val8", 1000L, false),
+      (9, "val9", 1000L, false),
+      (10, "val10", 1000L, false)
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit1 = writeBatch(df1, commonOpts, tableVersion, SaveMode.Overwrite)
+
+    // Commit c2 - Updates with mixed ordering values and deletes
+    // Tests event time ordering: higher timestamps should win, lower should 
be ignored
+    val df2 = Seq(
+      // Updates with higher timestamps (should win over original records)
+      (1, "val1_updated_high", 2000L, false),
+      (2, "val2_updated_high", 2000L, false),
+      (3, "val3_updated_high", 2000L, false),
+      // Updates with lower timestamps (should be ignored due to event time 
ordering)
+      (4, "val4_updated_low", 500L, false),
+      (5, "val5_updated_low", 500L, false),
+      // Deletes using _hoodie_is_deleted column
+      (9, "val9", 2000L, true),
+      (10, "val10", 2000L, true)
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit2 = writeBatch(df2, commonOpts, tableVersion)
+
+    metaClient.reload()
+    val currentTableVersion = 
metaClient.getTableConfig.getTableVersion.versionCode()
+    assert(currentTableVersion == tableVersion.toInt,
+      s"Table version should remain $tableVersion but found 
$currentTableVersion after second write")
+
+    // Commit c3 - New Inserts (3 new records)
+    val df3 = Seq(
+      (11, "val11", 3000L, false),
+      (12, "val12", 3000L, false),
+      (13, "val13", 3000L, false)
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit3 = writeBatch(df3, commonOpts, tableVersion)
+
+    // Commit c4 - More Updates (should override previous values)
+    val df4 = Seq(
+      (2, "val2_updated_again", 4000L, false), // Update existing record
+      (4, "val4_updated_high", 4000L, false),  // This should now override the 
original (higher than 1000)
+      (6, "val6_updated", 4000L, false)        // Update another record
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit4 = writeBatch(df4, commonOpts, tableVersion)
+
+    // Commit c5 - Final Delete
+    val df5 = Seq(
+      (7, "val7", 5000L, true) // Delete record 7
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit5 = writeBatch(df5, commonOpts, tableVersion)
+
+    // Commit c6 - Test deletes with lower ordering values (should be ignored)
+    val df6 = Seq(
+      (1, "val1", 500L, true),  // Attempt to delete record 1 with lower 
timestamp
+      (11, "val11", 2000L, true), // Attempt to delete record 11 with lower 
timestamp
+      (13, "val13", 1000L, true)  // Attempt to delete record 13 with lower 
timestamp
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit6 = writeBatch(df6, commonOpts, tableVersion)
+
+    // Time Travel Query - Query state as of c2
+    // Should show: 8 records (10 original - 2 deletes)
+    // Records 1,2,3 should have updated values (higher timestamps)
+    // Records 4,5 should have original values (lower timestamp updates 
ignored)
+    val timeTravelDf2 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit2)
+      .load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    val expectedC2 = Array(
+      Row(1, "val1_updated_high", 2000L), // Updated (higher timestamp)
+      Row(2, "val2_updated_high", 2000L), // Updated (higher timestamp)
+      Row(3, "val3_updated_high", 2000L), // Updated (higher timestamp)
+      Row(4, "val4", 1000L),              // Original value kept (lower 
timestamp update ignored)
+      Row(5, "val5", 1000L),              // Original value kept (lower 
timestamp update ignored)
+      Row(6, "val6", 1000L),              // Original value
+      Row(7, "val7", 1000L),              // Original value
+      Row(8, "val8", 1000L)               // Original value
+      // Records 9 and 10 are deleted
+    )
+    assertEquals(expectedC2.length, timeTravelDf2.length)
+    expectedC2.zip(timeTravelDf2).foreach { case (expected, actual) =>
+      assertEquals(expected, actual)
+    }
+
+    // Time Travel Query - Query state as of c4
+    // Should show: 11 records (8 from c2 + 3 new from c3, with updates from 
c4)
+    val timeTravelDf4 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit4)
+      .load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    val expectedC4 = Array(
+      Row(1, "val1_updated_high", 2000L), // From c2
+      Row(2, "val2_updated_again", 4000L), // Updated in c4
+      Row(3, "val3_updated_high", 2000L), // From c2
+      Row(4, "val4_updated_high", 4000L), // Finally updated in c4 (higher 
than original)
+      Row(5, "val5", 1000L),              // Still original value
+      Row(6, "val6_updated", 4000L),      // Updated in c4
+      Row(7, "val7", 1000L),              // Original value (deleted later in 
c5)
+      Row(8, "val8", 1000L),              // Original value
+      Row(11, "val11", 3000L),            // New record from c3
+      Row(12, "val12", 3000L),            // New record from c3
+      Row(13, "val13", 3000L)             // New record from c3
+    )
+    assertEquals(expectedC4.length, timeTravelDf4.length)
+    expectedC4.zip(timeTravelDf4).foreach { case (expected, actual) =>
+      assertEquals(expected, actual)
+    }
+
+    // Incremental Query (c1, c3) - Multi-commit range
+    // Note: Incremental query range semantics differ by table version:
+    // - v6: Uses open_close range (START exclusive, END inclusive) so for now 
doing a commit - 1 on start time
+    // - v9: Uses close_close range (both START and END inclusive)
+    val incrementalDf1 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit1, tableVersion))
+      .option(DataSourceReadOptions.END_COMMIT.key, commit3)
+      .load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    // Expected records for incremental query (c1, c3):
+    // Should include all changes from c1, c2, and c3
+    // - c1: Initial inserts 1-10
+    // - c2: Updates to 1,2,3 (higher timestamps), 4,5 (lower timestamps 
ignored but still appear as touched), deletes 9,10
+    // - c3: New inserts 11,12,13
+    val expectedIncremental1 = Array(
+      Row(1, "val1_updated_high", 2000L),  // Original from c1, updated in c2
+      Row(2, "val2_updated_high", 2000L),  // Original from c1, updated in c2
+      Row(3, "val3_updated_high", 2000L),  // Original from c1, updated in c2
+      Row(4, "val4", 1000L),                // Original from c1, touched in c2 
but original value kept
+      Row(5, "val5", 1000L),                // Original from c1, touched in c2 
but original value kept
+      Row(6, "val6", 1000L),                // Original from c1
+      Row(7, "val7", 1000L),                // Original from c1
+      Row(8, "val8", 1000L),                // Original from c1
+      Row(11, "val11", 3000L),              // New insert from c3
+      Row(12, "val12", 3000L),              // New insert from c3
+      Row(13, "val13", 3000L)               // New insert from c3
+      // Records 9,10 deleted in c2 won't appear in incremental results
+    )
+    assertEquals(expectedIncremental1.length, incrementalDf1.length,
+      s"Incremental query (c1,c3) should return ${expectedIncremental1.length} 
records")
+    expectedIncremental1.zip(incrementalDf1).foreach { case (expected, actual) 
=>
+      assertEquals(expected, actual)
+    }
+
+    // Incremental Query (c3, latest) - Open-ended range
+    // Should include changes from c3, c4 and c5
+    val incrementalDf2 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit3, tableVersion))
+      .load(basePath) // No END_COMMIT means up to latest
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    // Expected records for incremental query (c3, latest):
+    // Should include changes from c3, c4 and c5
+    // - c3: New inserts 11,12,13
+    // - c4: Updates to 2,4,6
+    // - c5: Delete 7 (may not appear in results depending on configuration)
+    val expectedIncremental2 = Array(
+      Row(2, "val2_updated_again", 4000L), // Updated in c4
+      Row(4, "val4_updated_high", 4000L),  // Updated in c4
+      Row(6, "val6_updated", 4000L),       // Updated in c4
+      Row(11, "val11", 3000L),             // New insert from c3
+      Row(12, "val12", 3000L),             // New insert from c3
+      Row(13, "val13", 3000L)              // New insert from c3
+      // Record 7 deleted in c5 typically won't appear in incremental results
+    )
+    assertEquals(expectedIncremental2.length, incrementalDf2.length,
+      s"Incremental query (c3,latest) should return 
${expectedIncremental2.length} records")
+    expectedIncremental2.zip(incrementalDf2).foreach { case (expected, actual) 
=>
+      assertEquals(expected, actual)
+    }
+
+    // Final Snapshot Validation - Verify final state after all commits
+    // Expected: 10 records (11 after c3, minus 1 delete in c5)
+    // c6 deletes with lower ordering values should be ignored due to event 
time ordering
+    val finalDf = spark.read.format("hudi").load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    val expectedFinal = Array(
+      Row(1, "val1_updated_high", 2000L), // From c2
+      Row(2, "val2_updated_again", 4000L), // From c4
+      Row(3, "val3_updated_high", 2000L), // From c2
+      Row(4, "val4_updated_high", 4000L), // From c4
+      Row(5, "val5", 1000L),              // Original (low timestamp updates 
ignored)
+      Row(6, "val6_updated", 4000L),      // From c4
+      // Row 7 deleted in c5
+      Row(8, "val8", 1000L),              // Original
+      // Rows 9,10 deleted in c2
+      Row(11, "val11", 3000L),            // From c3
+      Row(12, "val12", 3000L),            // From c3
+      Row(13, "val13", 3000L)             // From c3
+    )
+    assertEquals(expectedFinal.length, finalDf.length, "Final snapshot should 
have correct number of records")
+    expectedFinal.zip(finalDf).foreach { case (expected, actual) =>
+      assertEquals(expected, actual)
+    }
+  }
+
 }
 
 object TestCOWDataSource {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index f4b7db41efa3..03dcf79b3992 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1940,6 +1940,274 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
     }
   }
 
+  @ParameterizedTest
+  @CsvSource(Array("6", "9"))
+  def testIncrementalAndTimeTravelWithEventTimeOrdering(tableVersion: String): 
Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    // Configuration with event time ordering enabled
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp", // Required for 
event time ordering
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> tableVersion,
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "" // non-partitioned
+    )
+
+    // Helper method to get commit time based on table version
+    // v6 uses requestedTime, v9+ uses completionTime
+    def getCommitTime(tableVersion: String): String = {
+      metaClient.reloadActiveTimeline()
+      val lastInstant = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
+      if (tableVersion == "6") {
+        lastInstant.requestedTime
+      } else {
+        lastInstant.getCompletionTime
+      }
+    }
+
+    // Helper method to write data and return version-appropriate commit time
+    def writeBatch(df: DataFrame, opts: Map[String, String], tableVersion: 
String, mode: SaveMode = SaveMode.Append): String = {
+      df.write.format("hudi").options(opts).mode(mode).save(basePath)
+      // Initialize metaClient after first write if it doesn't exist
+      if (metaClient == null) {
+        metaClient = createMetaClient(spark, basePath)
+      }
+      getCommitTime(tableVersion)
+    }
+
+    // Helper method to adjust start time for incremental queries based on 
version
+    // v6: open_close range (START exclusive, END inclusive)
+    // v9: close_close range (both START and END inclusive)
+    def getIncrementalStartTime(commitTime: String, tableVersion: String): 
String = {
+      if (tableVersion == "6") {
+        // v6: open_close - need to use time just before to include the commit
+        (commitTime.toLong - 1).toString
+      } else {
+        // v9: close_close - use the actual commit time
+        commitTime
+      }
+    }
+
+    // Commit c1 - Initial Insert (10 records with timestamp 1000)
+    val df1 = Seq(
+      (1, "val1", 1000L, false),
+      (2, "val2", 1000L, false),
+      (3, "val3", 1000L, false),
+      (4, "val4", 1000L, false),
+      (5, "val5", 1000L, false),
+      (6, "val6", 1000L, false),
+      (7, "val7", 1000L, false),
+      (8, "val8", 1000L, false),
+      (9, "val9", 1000L, false),
+      (10, "val10", 1000L, false)
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit1 = writeBatch(df1, commonOpts, tableVersion, SaveMode.Overwrite)
+
+    // Commit c2 - Updates with mixed ordering values and deletes
+    // Tests event time ordering: higher timestamps should win, lower should 
be ignored
+    val df2 = Seq(
+      // Updates with higher timestamps (should win over original records)
+      (1, "val1_updated_high", 2000L, false),
+      (2, "val2_updated_high", 2000L, false),
+      (3, "val3_updated_high", 2000L, false),
+      // Updates with lower timestamps (should be ignored due to event time 
ordering)
+      (4, "val4_updated_low", 500L, false),
+      (5, "val5_updated_low", 500L, false),
+      // Deletes using _hoodie_is_deleted column
+      (9, "val9", 2000L, true),
+      (10, "val10", 2000L, true)
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit2 = writeBatch(df2, commonOpts, tableVersion)
+
+    metaClient.reload()
+    val currentTableVersion = 
metaClient.getTableConfig.getTableVersion.versionCode()
+    assert(currentTableVersion == tableVersion.toInt,
+      s"Table version should remain $tableVersion but found 
$currentTableVersion after second write")
+
+    // Commit c3 - New Inserts (3 new records)
+    val df3 = Seq(
+      (11, "val11", 3000L, false),
+      (12, "val12", 3000L, false),
+      (13, "val13", 3000L, false)
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit3 = writeBatch(df3, commonOpts, tableVersion)
+
+    // Commit c4 - More Updates (should override previous values)
+    val df4 = Seq(
+      (2, "val2_updated_again", 4000L, false), // Update existing record
+      (4, "val4_updated_high", 4000L, false),  // This should now override the 
original (higher than 1000)
+      (6, "val6_updated", 4000L, false)        // Update another record
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit4 = writeBatch(df4, commonOpts, tableVersion)
+
+    // Commit c5 - Final Delete
+    val df5 = Seq(
+      (7, "val7", 5000L, true) // Delete record 7
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit5 = writeBatch(df5, commonOpts, tableVersion)
+
+    // Commit c6 - Test deletes with lower ordering values (should be ignored)
+    val df6 = Seq(
+      (1, "val1", 500L, true),  // Attempt to delete record 1 with lower 
timestamp
+      (11, "val11", 2000L, true), // Attempt to delete record 11 with lower 
timestamp
+      (13, "val13", 1000L, true)  // Attempt to delete record 13 with lower 
timestamp
+    ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+    val commit6 = writeBatch(df6, commonOpts, tableVersion)
+
+    // Time Travel Query - Query state as of c2
+    // Should show: 8 records (10 original - 2 deletes)
+    // Records 1,2,3 should have updated values (higher timestamps)
+    // Records 4,5 should have original values (lower timestamp updates 
ignored)
+    val timeTravelDf2 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit2)
+      .load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    val expectedC2 = Array(
+      Row(1, "val1_updated_high", 2000L), // Updated (higher timestamp)
+      Row(2, "val2_updated_high", 2000L), // Updated (higher timestamp)
+      Row(3, "val3_updated_high", 2000L), // Updated (higher timestamp)
+      Row(4, "val4", 1000L),              // Original value kept (lower 
timestamp update ignored)
+      Row(5, "val5", 1000L),              // Original value kept (lower 
timestamp update ignored)
+      Row(6, "val6", 1000L),              // Original value
+      Row(7, "val7", 1000L),              // Original value
+      Row(8, "val8", 1000L)               // Original value
+      // Records 9 and 10 are deleted
+    )
+    assertEquals(expectedC2.length, timeTravelDf2.length)
+    expectedC2.zip(timeTravelDf2).foreach { case (expected, actual) =>
+      assertEquals(expected, actual)
+    }
+
+    // Time Travel Query - Query state as of c4
+    // Should show: 11 records (8 from c2 + 3 new from c3, with updates from 
c4)
+    val timeTravelDf4 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit4)
+      .load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    val expectedC4 = Array(
+      Row(1, "val1_updated_high", 2000L), // From c2
+      Row(2, "val2_updated_again", 4000L), // Updated in c4
+      Row(3, "val3_updated_high", 2000L), // From c2
+      Row(4, "val4_updated_high", 4000L), // Finally updated in c4 (higher 
than original)
+      Row(5, "val5", 1000L),              // Still original value
+      Row(6, "val6_updated", 4000L),      // Updated in c4
+      Row(7, "val7", 1000L),              // Original value (deleted later in 
c5)
+      Row(8, "val8", 1000L),              // Original value
+      Row(11, "val11", 3000L),            // New record from c3
+      Row(12, "val12", 3000L),            // New record from c3
+      Row(13, "val13", 3000L)             // New record from c3
+    )
+    assertEquals(expectedC4.length, timeTravelDf4.length)
+    expectedC4.zip(timeTravelDf4).foreach { case (expected, actual) =>
+      assertEquals(expected, actual)
+    }
+
+    // Incremental Query (c1, c3) - Multi-commit range
+    // Note: Incremental query range semantics differ by table version:
+    // - v6: Uses open_close range (START exclusive, END inclusive) so for now 
doing a commit - 1 on start time
+    // - v9: Uses close_close range (both START and END inclusive)
+    val incrementalDf1 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit1, tableVersion))
+      .option(DataSourceReadOptions.END_COMMIT.key, commit3)
+      .load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    // Expected records for incremental query (c1, c3):
+    // Should include all changes from c1, c2, and c3
+    // - c1: Initial inserts 1-10
+    // - c2: Updates to 1,2,3 (higher timestamps), 4,5 (lower timestamps 
ignored but still appear as touched), deletes 9,10
+    // - c3: New inserts 11,12,13
+    val expectedIncremental1 = Array(
+      Row(1, "val1_updated_high", 2000L),  // Original from c1, updated in c2
+      Row(2, "val2_updated_high", 2000L),  // Original from c1, updated in c2
+      Row(3, "val3_updated_high", 2000L),  // Original from c1, updated in c2
+      Row(4, "val4", 1000L),                // Original from c1, touched in c2 
but original value kept
+      Row(5, "val5", 1000L),                // Original from c1, touched in c2 
but original value kept
+      Row(6, "val6", 1000L),                // Original from c1
+      Row(7, "val7", 1000L),                // Original from c1
+      Row(8, "val8", 1000L),                // Original from c1
+      Row(11, "val11", 3000L),              // New insert from c3
+      Row(12, "val12", 3000L),              // New insert from c3
+      Row(13, "val13", 3000L)               // New insert from c3
+      // Records 9,10 deleted in c2 won't appear in incremental results
+    )
+    assertEquals(expectedIncremental1.length, incrementalDf1.length,
+      s"Incremental query (c1,c3) should return ${expectedIncremental1.length} 
records")
+    expectedIncremental1.zip(incrementalDf1).foreach { case (expected, actual) 
=>
+      assertEquals(expected, actual)
+    }
+
+    // Incremental Query (c3, latest) - Open-ended range
+    // Should include changes from c3, c4 and c5
+    val incrementalDf2 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.START_COMMIT.key, 
getIncrementalStartTime(commit3, tableVersion))
+      .load(basePath) // No END_COMMIT means up to latest
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    // Expected records for incremental query (c3, latest):
+    // Should include changes from c3, c4 and c5
+    // - c3: New inserts 11,12,13
+    // - c4: Updates to 2,4,6
+    // - c5: Delete 7 (may not appear in results depending on configuration)
+    val expectedIncremental2 = Array(
+      Row(2, "val2_updated_again", 4000L), // Updated in c4
+      Row(4, "val4_updated_high", 4000L),  // Updated in c4
+      Row(6, "val6_updated", 4000L),       // Updated in c4
+      Row(11, "val11", 3000L),             // New insert from c3
+      Row(12, "val12", 3000L),             // New insert from c3
+      Row(13, "val13", 3000L)              // New insert from c3
+      // Record 7 deleted in c5 typically won't appear in incremental results
+    )
+    assertEquals(expectedIncremental2.length, incrementalDf2.length,
+      s"Incremental query (c3,latest) should return 
${expectedIncremental2.length} records")
+    expectedIncremental2.zip(incrementalDf2).foreach { case (expected, actual) 
=>
+      assertEquals(expected, actual)
+    }
+
+    // Final Snapshot Validation - Verify final state after all commits
+    // Expected: 10 records (11 after c3, minus 1 delete in c5)
+    val finalDf = spark.read.format("hudi").load(basePath)
+      .select("id", "value", "timestamp")
+      .orderBy("id")
+      .collect()
+
+    val expectedFinal = Array(
+      Row(1, "val1_updated_high", 2000L), // From c2
+      Row(2, "val2_updated_again", 4000L), // From c4
+      Row(3, "val3_updated_high", 2000L), // From c2
+      Row(4, "val4_updated_high", 4000L), // From c4
+      Row(5, "val5", 1000L),              // Original (low timestamp updates 
ignored)
+      Row(6, "val6_updated", 4000L),      // From c4
+      // Row 7 deleted in c5
+      Row(8, "val8", 1000L),              // Original
+      // Rows 9,10 deleted in c2
+      Row(11, "val11", 3000L),            // From c3
+      Row(12, "val12", 3000L),            // From c3
+      Row(13, "val13", 3000L)             // From c3
+    )
+    assertEquals(expectedFinal.length, finalDf.length, "Final snapshot should 
have correct number of records")
+    expectedFinal.zip(finalDf).foreach { case (expected, actual) =>
+      assertEquals(expected, actual)
+    }
+  }
+
   private def loadFixtureTable(testBasePath: String, version: 
HoodieTableVersion): HoodieTableMetaClient = {
     val fixtureName = getFixtureName(version, "")
     val resourcePath = s"/upgrade-downgrade-fixtures/mor-tables/$fixtureName"

Reply via email to