This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1e0513d1585a [MINOR] Incremental and Time Travel Query Testing (#13934)
1e0513d1585a is described below
commit 1e0513d1585aa1b39939770909733dbbf868451d
Author: Rahil C <[email protected]>
AuthorDate: Sat Sep 20 07:15:24 2025 -0700
[MINOR] Incremental and Time Travel Query Testing (#13934)
---
.../apache/hudi/functional/TestCOWDataSource.scala | 283 +++++++++++++++++++++
.../apache/hudi/functional/TestMORDataSource.scala | 268 +++++++++++++++++++
2 files changed, 551 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 0058bac9937d..6f56f466bc9e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -2141,6 +2141,289 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(count, 0)
}
+ /**
+ * Test incremental queries and time travel queries with event time ordering.
+ *
+ * This test validates:
+ * 1. Event time ordering behavior (updates with lower timestamps are
ignored)
+ * 2. Delete operations using _hoodie_is_deleted column
+ * 3. Time travel queries showing correct historical state
+ * 4. Incremental queries returning changes within specified commit ranges
+ * 5. Version-specific behavior differences between v6 and v9 tables
+ *
+ * Key version differences:
+ * - v6: Uses requestedTime for commits, open_close incremental ranges
+ * - v9: Uses completionTime for commits, close_close incremental ranges
+ */
+ @ParameterizedTest
+ @CsvSource(Array("6", "9"))
+ def testIncrementalAndTimeTravelWithEventTimeOrdering(tableVersion: String):
Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ // Configuration with event time ordering enabled
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp", // Required for
event time ordering
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key -> tableVersion,
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "" // non-partitioned
+ )
+
+ // Helper method to get commit time based on table version
+ // v6 uses requestedTime, v9+ uses completionTime
+ def getCommitTime(tableVersion: String): String = {
+ metaClient.reloadActiveTimeline()
+ val lastInstant =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableVersion == "6") {
+ lastInstant.requestedTime
+ } else {
+ lastInstant.getCompletionTime
+ }
+ }
+
+ // Helper method to write data and return version-appropriate commit time
+ def writeBatch(df: DataFrame, opts: Map[String, String], tableVersion:
String, mode: SaveMode = SaveMode.Append): String = {
+ df.write.format("hudi").options(opts).mode(mode).save(basePath)
+ // Initialize metaClient after first write if it doesn't exist
+ if (metaClient == null) {
+ metaClient = createMetaClient(spark, basePath)
+ }
+ getCommitTime(tableVersion)
+ }
+
+ // Helper method to adjust start time for incremental queries based on
version
+ // v6: open_close range (START exclusive, END inclusive)
+ // v9: close_close range (both START and END inclusive)
+ def getIncrementalStartTime(commitTime: String, tableVersion: String):
String = {
+ if (tableVersion == "6") {
+ // v6: open_close - need to use time just before to include the commit
+ (commitTime.toLong - 1).toString
+ } else {
+ // v9: close_close - use the actual commit time
+ commitTime
+ }
+ }
+
+ // Commit c1 - Initial Insert (10 records with timestamp 1000)
+ val df1 = Seq(
+ (1, "val1", 1000L, false),
+ (2, "val2", 1000L, false),
+ (3, "val3", 1000L, false),
+ (4, "val4", 1000L, false),
+ (5, "val5", 1000L, false),
+ (6, "val6", 1000L, false),
+ (7, "val7", 1000L, false),
+ (8, "val8", 1000L, false),
+ (9, "val9", 1000L, false),
+ (10, "val10", 1000L, false)
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit1 = writeBatch(df1, commonOpts, tableVersion, SaveMode.Overwrite)
+
+ // Commit c2 - Updates with mixed ordering values and deletes
+ // Tests event time ordering: higher timestamps should win, lower should
be ignored
+ val df2 = Seq(
+ // Updates with higher timestamps (should win over original records)
+ (1, "val1_updated_high", 2000L, false),
+ (2, "val2_updated_high", 2000L, false),
+ (3, "val3_updated_high", 2000L, false),
+ // Updates with lower timestamps (should be ignored due to event time
ordering)
+ (4, "val4_updated_low", 500L, false),
+ (5, "val5_updated_low", 500L, false),
+ // Deletes using _hoodie_is_deleted column
+ (9, "val9", 2000L, true),
+ (10, "val10", 2000L, true)
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit2 = writeBatch(df2, commonOpts, tableVersion)
+
+ metaClient.reload()
+ val currentTableVersion =
metaClient.getTableConfig.getTableVersion.versionCode()
+ assert(currentTableVersion == tableVersion.toInt,
+ s"Table version should remain $tableVersion but found
$currentTableVersion after second write")
+
+ // Commit c3 - New Inserts (3 new records)
+ val df3 = Seq(
+ (11, "val11", 3000L, false),
+ (12, "val12", 3000L, false),
+ (13, "val13", 3000L, false)
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit3 = writeBatch(df3, commonOpts, tableVersion)
+
+ // Commit c4 - More Updates (should override previous values)
+ val df4 = Seq(
+ (2, "val2_updated_again", 4000L, false), // Update existing record
+ (4, "val4_updated_high", 4000L, false), // This should now override the
original (higher than 1000)
+ (6, "val6_updated", 4000L, false) // Update another record
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit4 = writeBatch(df4, commonOpts, tableVersion)
+
+ // Commit c5 - Final Delete
+ val df5 = Seq(
+ (7, "val7", 5000L, true) // Delete record 7
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit5 = writeBatch(df5, commonOpts, tableVersion)
+
+ // Commit c6 - Test deletes with lower ordering values (should be ignored)
+ val df6 = Seq(
+ (1, "val1", 500L, true), // Attempt to delete record 1 with lower
timestamp
+ (11, "val11", 2000L, true), // Attempt to delete record 11 with lower
timestamp
+ (13, "val13", 1000L, true) // Attempt to delete record 13 with lower
timestamp
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit6 = writeBatch(df6, commonOpts, tableVersion)
+
+ // Time Travel Query - Query state as of c2
+ // Should show: 8 records (10 original - 2 deletes)
+ // Records 1,2,3 should have updated values (higher timestamps)
+ // Records 4,5 should have original values (lower timestamp updates
ignored)
+ val timeTravelDf2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit2)
+ .load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ val expectedC2 = Array(
+ Row(1, "val1_updated_high", 2000L), // Updated (higher timestamp)
+ Row(2, "val2_updated_high", 2000L), // Updated (higher timestamp)
+ Row(3, "val3_updated_high", 2000L), // Updated (higher timestamp)
+ Row(4, "val4", 1000L), // Original value kept (lower
timestamp update ignored)
+ Row(5, "val5", 1000L), // Original value kept (lower
timestamp update ignored)
+ Row(6, "val6", 1000L), // Original value
+ Row(7, "val7", 1000L), // Original value
+ Row(8, "val8", 1000L) // Original value
+ // Records 9 and 10 are deleted
+ )
+ assertEquals(expectedC2.length, timeTravelDf2.length)
+ expectedC2.zip(timeTravelDf2).foreach { case (expected, actual) =>
+ assertEquals(expected, actual)
+ }
+
+ // Time Travel Query - Query state as of c4
+ // Should show: 11 records (8 from c2 + 3 new from c3, with updates from
c4)
+ val timeTravelDf4 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit4)
+ .load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ val expectedC4 = Array(
+ Row(1, "val1_updated_high", 2000L), // From c2
+ Row(2, "val2_updated_again", 4000L), // Updated in c4
+ Row(3, "val3_updated_high", 2000L), // From c2
+ Row(4, "val4_updated_high", 4000L), // Finally updated in c4 (higher
than original)
+ Row(5, "val5", 1000L), // Still original value
+ Row(6, "val6_updated", 4000L), // Updated in c4
+ Row(7, "val7", 1000L), // Original value (deleted later in
c5)
+ Row(8, "val8", 1000L), // Original value
+ Row(11, "val11", 3000L), // New record from c3
+ Row(12, "val12", 3000L), // New record from c3
+ Row(13, "val13", 3000L) // New record from c3
+ )
+ assertEquals(expectedC4.length, timeTravelDf4.length)
+ expectedC4.zip(timeTravelDf4).foreach { case (expected, actual) =>
+ assertEquals(expected, actual)
+ }
+
+ // Incremental Query (c1, c3) - Multi-commit range
+ // Note: Incremental query range semantics differ by table version:
+ // - v6: Uses open_close range (START exclusive, END inclusive) so for now
doing a commit - 1 on start time
+ // - v9: Uses close_close range (both START and END inclusive)
+ val incrementalDf1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit1, tableVersion))
+ .option(DataSourceReadOptions.END_COMMIT.key, commit3)
+ .load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ // Expected records for incremental query (c1, c3):
+ // Should include all changes from c1, c2, and c3
+ // - c1: Initial inserts 1-10
+ // - c2: Updates to 1,2,3 (higher timestamps), 4,5 (lower timestamps
ignored but still appear as touched), deletes 9,10
+ // - c3: New inserts 11,12,13
+ val expectedIncremental1 = Array(
+ Row(1, "val1_updated_high", 2000L), // Original from c1, updated in c2
+ Row(2, "val2_updated_high", 2000L), // Original from c1, updated in c2
+ Row(3, "val3_updated_high", 2000L), // Original from c1, updated in c2
+ Row(4, "val4", 1000L), // Original from c1, touched in c2
but original value kept
+ Row(5, "val5", 1000L), // Original from c1, touched in c2
but original value kept
+ Row(6, "val6", 1000L), // Original from c1
+ Row(7, "val7", 1000L), // Original from c1
+ Row(8, "val8", 1000L), // Original from c1
+ Row(11, "val11", 3000L), // New insert from c3
+ Row(12, "val12", 3000L), // New insert from c3
+ Row(13, "val13", 3000L) // New insert from c3
+ // Records 9,10 deleted in c2 won't appear in incremental results
+ )
+ assertEquals(expectedIncremental1.length, incrementalDf1.length,
+ s"Incremental query (c1,c3) should return ${expectedIncremental1.length}
records")
+ expectedIncremental1.zip(incrementalDf1).foreach { case (expected, actual)
=>
+ assertEquals(expected, actual)
+ }
+
+ // Incremental Query (c3, latest) - Open-ended range
+ // Should include changes from c3, c4 and c5
+ val incrementalDf2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit3, tableVersion))
+ .load(basePath) // No END_COMMIT means up to latest
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ // Expected records for incremental query (c3, latest):
+ // Should include changes from c3, c4 and c5
+ // - c3: New inserts 11,12,13
+ // - c4: Updates to 2,4,6
+ // - c5: Delete 7 (may not appear in results depending on configuration)
+ val expectedIncremental2 = Array(
+ Row(2, "val2_updated_again", 4000L), // Updated in c4
+ Row(4, "val4_updated_high", 4000L), // Updated in c4
+ Row(6, "val6_updated", 4000L), // Updated in c4
+ Row(11, "val11", 3000L), // New insert from c3
+ Row(12, "val12", 3000L), // New insert from c3
+ Row(13, "val13", 3000L) // New insert from c3
+ // Record 7 deleted in c5 typically won't appear in incremental results
+ )
+ assertEquals(expectedIncremental2.length, incrementalDf2.length,
+ s"Incremental query (c3,latest) should return
${expectedIncremental2.length} records")
+ expectedIncremental2.zip(incrementalDf2).foreach { case (expected, actual)
=>
+ assertEquals(expected, actual)
+ }
+
+ // Final Snapshot Validation - Verify final state after all commits
+ // Expected: 10 records (11 after c3, minus 1 delete in c5)
+ // c6 deletes with lower ordering values should be ignored due to event
time ordering
+ val finalDf = spark.read.format("hudi").load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ val expectedFinal = Array(
+ Row(1, "val1_updated_high", 2000L), // From c2
+ Row(2, "val2_updated_again", 4000L), // From c4
+ Row(3, "val3_updated_high", 2000L), // From c2
+ Row(4, "val4_updated_high", 4000L), // From c4
+ Row(5, "val5", 1000L), // Original (low timestamp updates
ignored)
+ Row(6, "val6_updated", 4000L), // From c4
+ // Row 7 deleted in c5
+ Row(8, "val8", 1000L), // Original
+ // Rows 9,10 deleted in c2
+ Row(11, "val11", 3000L), // From c3
+ Row(12, "val12", 3000L), // From c3
+ Row(13, "val13", 3000L) // From c3
+ )
+ assertEquals(expectedFinal.length, finalDf.length, "Final snapshot should
have correct number of records")
+ expectedFinal.zip(finalDf).foreach { case (expected, actual) =>
+ assertEquals(expected, actual)
+ }
+ }
+
}
object TestCOWDataSource {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index f4b7db41efa3..03dcf79b3992 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1940,6 +1940,274 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
}
}
+ @ParameterizedTest
+ @CsvSource(Array("6", "9"))
+ def testIncrementalAndTimeTravelWithEventTimeOrdering(tableVersion: String):
Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ // Configuration with event time ordering enabled
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp", // Required for
event time ordering
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key -> tableVersion,
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "" // non-partitioned
+ )
+
+ // Helper method to get commit time based on table version
+ // v6 uses requestedTime, v9+ uses completionTime
+ def getCommitTime(tableVersion: String): String = {
+ metaClient.reloadActiveTimeline()
+ val lastInstant =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableVersion == "6") {
+ lastInstant.requestedTime
+ } else {
+ lastInstant.getCompletionTime
+ }
+ }
+
+ // Helper method to write data and return version-appropriate commit time
+ def writeBatch(df: DataFrame, opts: Map[String, String], tableVersion:
String, mode: SaveMode = SaveMode.Append): String = {
+ df.write.format("hudi").options(opts).mode(mode).save(basePath)
+ // Initialize metaClient after first write if it doesn't exist
+ if (metaClient == null) {
+ metaClient = createMetaClient(spark, basePath)
+ }
+ getCommitTime(tableVersion)
+ }
+
+ // Helper method to adjust start time for incremental queries based on
version
+ // v6: open_close range (START exclusive, END inclusive)
+ // v9: close_close range (both START and END inclusive)
+ def getIncrementalStartTime(commitTime: String, tableVersion: String):
String = {
+ if (tableVersion == "6") {
+ // v6: open_close - need to use time just before to include the commit
+ (commitTime.toLong - 1).toString
+ } else {
+ // v9: close_close - use the actual commit time
+ commitTime
+ }
+ }
+
+ // Commit c1 - Initial Insert (10 records with timestamp 1000)
+ val df1 = Seq(
+ (1, "val1", 1000L, false),
+ (2, "val2", 1000L, false),
+ (3, "val3", 1000L, false),
+ (4, "val4", 1000L, false),
+ (5, "val5", 1000L, false),
+ (6, "val6", 1000L, false),
+ (7, "val7", 1000L, false),
+ (8, "val8", 1000L, false),
+ (9, "val9", 1000L, false),
+ (10, "val10", 1000L, false)
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit1 = writeBatch(df1, commonOpts, tableVersion, SaveMode.Overwrite)
+
+ // Commit c2 - Updates with mixed ordering values and deletes
+ // Tests event time ordering: higher timestamps should win, lower should
be ignored
+ val df2 = Seq(
+ // Updates with higher timestamps (should win over original records)
+ (1, "val1_updated_high", 2000L, false),
+ (2, "val2_updated_high", 2000L, false),
+ (3, "val3_updated_high", 2000L, false),
+ // Updates with lower timestamps (should be ignored due to event time
ordering)
+ (4, "val4_updated_low", 500L, false),
+ (5, "val5_updated_low", 500L, false),
+ // Deletes using _hoodie_is_deleted column
+ (9, "val9", 2000L, true),
+ (10, "val10", 2000L, true)
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit2 = writeBatch(df2, commonOpts, tableVersion)
+
+ metaClient.reload()
+ val currentTableVersion =
metaClient.getTableConfig.getTableVersion.versionCode()
+ assert(currentTableVersion == tableVersion.toInt,
+ s"Table version should remain $tableVersion but found
$currentTableVersion after second write")
+
+ // Commit c3 - New Inserts (3 new records)
+ val df3 = Seq(
+ (11, "val11", 3000L, false),
+ (12, "val12", 3000L, false),
+ (13, "val13", 3000L, false)
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit3 = writeBatch(df3, commonOpts, tableVersion)
+
+ // Commit c4 - More Updates (should override previous values)
+ val df4 = Seq(
+ (2, "val2_updated_again", 4000L, false), // Update existing record
+ (4, "val4_updated_high", 4000L, false), // This should now override the
original (higher than 1000)
+ (6, "val6_updated", 4000L, false) // Update another record
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit4 = writeBatch(df4, commonOpts, tableVersion)
+
+ // Commit c5 - Final Delete
+ val df5 = Seq(
+ (7, "val7", 5000L, true) // Delete record 7
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit5 = writeBatch(df5, commonOpts, tableVersion)
+
+ // Commit c6 - Test deletes with lower ordering values (should be ignored)
+ val df6 = Seq(
+ (1, "val1", 500L, true), // Attempt to delete record 1 with lower
timestamp
+ (11, "val11", 2000L, true), // Attempt to delete record 11 with lower
timestamp
+ (13, "val13", 1000L, true) // Attempt to delete record 13 with lower
timestamp
+ ).toDF("id", "value", "timestamp", "_hoodie_is_deleted")
+ val commit6 = writeBatch(df6, commonOpts, tableVersion)
+
+ // Time Travel Query - Query state as of c2
+ // Should show: 8 records (10 original - 2 deletes)
+ // Records 1,2,3 should have updated values (higher timestamps)
+ // Records 4,5 should have original values (lower timestamp updates
ignored)
+ val timeTravelDf2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit2)
+ .load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ val expectedC2 = Array(
+ Row(1, "val1_updated_high", 2000L), // Updated (higher timestamp)
+ Row(2, "val2_updated_high", 2000L), // Updated (higher timestamp)
+ Row(3, "val3_updated_high", 2000L), // Updated (higher timestamp)
+ Row(4, "val4", 1000L), // Original value kept (lower
timestamp update ignored)
+ Row(5, "val5", 1000L), // Original value kept (lower
timestamp update ignored)
+ Row(6, "val6", 1000L), // Original value
+ Row(7, "val7", 1000L), // Original value
+ Row(8, "val8", 1000L) // Original value
+ // Records 9 and 10 are deleted
+ )
+ assertEquals(expectedC2.length, timeTravelDf2.length)
+ expectedC2.zip(timeTravelDf2).foreach { case (expected, actual) =>
+ assertEquals(expected, actual)
+ }
+
+ // Time Travel Query - Query state as of c4
+ // Should show: 11 records (8 from c2 + 3 new from c3, with updates from
c4)
+ val timeTravelDf4 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit4)
+ .load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ val expectedC4 = Array(
+ Row(1, "val1_updated_high", 2000L), // From c2
+ Row(2, "val2_updated_again", 4000L), // Updated in c4
+ Row(3, "val3_updated_high", 2000L), // From c2
+ Row(4, "val4_updated_high", 4000L), // Finally updated in c4 (higher
than original)
+ Row(5, "val5", 1000L), // Still original value
+ Row(6, "val6_updated", 4000L), // Updated in c4
+ Row(7, "val7", 1000L), // Original value (deleted later in
c5)
+ Row(8, "val8", 1000L), // Original value
+ Row(11, "val11", 3000L), // New record from c3
+ Row(12, "val12", 3000L), // New record from c3
+ Row(13, "val13", 3000L) // New record from c3
+ )
+ assertEquals(expectedC4.length, timeTravelDf4.length)
+ expectedC4.zip(timeTravelDf4).foreach { case (expected, actual) =>
+ assertEquals(expected, actual)
+ }
+
+ // Incremental Query (c1, c3) - Multi-commit range
+ // Note: Incremental query range semantics differ by table version:
+ // - v6: Uses open_close range (START exclusive, END inclusive) so for now
doing a commit - 1 on start time
+ // - v9: Uses close_close range (both START and END inclusive)
+ val incrementalDf1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit1, tableVersion))
+ .option(DataSourceReadOptions.END_COMMIT.key, commit3)
+ .load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ // Expected records for incremental query (c1, c3):
+ // Should include all changes from c1, c2, and c3
+ // - c1: Initial inserts 1-10
+ // - c2: Updates to 1,2,3 (higher timestamps), 4,5 (lower timestamps
ignored but still appear as touched), deletes 9,10
+ // - c3: New inserts 11,12,13
+ val expectedIncremental1 = Array(
+ Row(1, "val1_updated_high", 2000L), // Original from c1, updated in c2
+ Row(2, "val2_updated_high", 2000L), // Original from c1, updated in c2
+ Row(3, "val3_updated_high", 2000L), // Original from c1, updated in c2
+ Row(4, "val4", 1000L), // Original from c1, touched in c2
but original value kept
+ Row(5, "val5", 1000L), // Original from c1, touched in c2
but original value kept
+ Row(6, "val6", 1000L), // Original from c1
+ Row(7, "val7", 1000L), // Original from c1
+ Row(8, "val8", 1000L), // Original from c1
+ Row(11, "val11", 3000L), // New insert from c3
+ Row(12, "val12", 3000L), // New insert from c3
+ Row(13, "val13", 3000L) // New insert from c3
+ // Records 9,10 deleted in c2 won't appear in incremental results
+ )
+ assertEquals(expectedIncremental1.length, incrementalDf1.length,
+ s"Incremental query (c1,c3) should return ${expectedIncremental1.length}
records")
+ expectedIncremental1.zip(incrementalDf1).foreach { case (expected, actual)
=>
+ assertEquals(expected, actual)
+ }
+
+ // Incremental Query (c3, latest) - Open-ended range
+ // Should include changes from c3, c4 and c5
+ val incrementalDf2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit3, tableVersion))
+ .load(basePath) // No END_COMMIT means up to latest
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ // Expected records for incremental query (c3, latest):
+ // Should include changes from c3, c4 and c5
+ // - c3: New inserts 11,12,13
+ // - c4: Updates to 2,4,6
+ // - c5: Delete 7 (may not appear in results depending on configuration)
+ val expectedIncremental2 = Array(
+ Row(2, "val2_updated_again", 4000L), // Updated in c4
+ Row(4, "val4_updated_high", 4000L), // Updated in c4
+ Row(6, "val6_updated", 4000L), // Updated in c4
+ Row(11, "val11", 3000L), // New insert from c3
+ Row(12, "val12", 3000L), // New insert from c3
+ Row(13, "val13", 3000L) // New insert from c3
+ // Record 7 deleted in c5 typically won't appear in incremental results
+ )
+ assertEquals(expectedIncremental2.length, incrementalDf2.length,
+ s"Incremental query (c3,latest) should return
${expectedIncremental2.length} records")
+ expectedIncremental2.zip(incrementalDf2).foreach { case (expected, actual)
=>
+ assertEquals(expected, actual)
+ }
+
+ // Final Snapshot Validation - Verify final state after all commits
+ // Expected: 10 records (11 after c3, minus 1 delete in c5)
+ val finalDf = spark.read.format("hudi").load(basePath)
+ .select("id", "value", "timestamp")
+ .orderBy("id")
+ .collect()
+
+ val expectedFinal = Array(
+ Row(1, "val1_updated_high", 2000L), // From c2
+ Row(2, "val2_updated_again", 4000L), // From c4
+ Row(3, "val3_updated_high", 2000L), // From c2
+ Row(4, "val4_updated_high", 4000L), // From c4
+ Row(5, "val5", 1000L), // Original (low timestamp updates
ignored)
+ Row(6, "val6_updated", 4000L), // From c4
+ // Row 7 deleted in c5
+ Row(8, "val8", 1000L), // Original
+ // Rows 9,10 deleted in c2
+ Row(11, "val11", 3000L), // From c3
+ Row(12, "val12", 3000L), // From c3
+ Row(13, "val13", 3000L) // From c3
+ )
+ assertEquals(expectedFinal.length, finalDf.length, "Final snapshot should
have correct number of records")
+ expectedFinal.zip(finalDf).foreach { case (expected, actual) =>
+ assertEquals(expected, actual)
+ }
+ }
+
private def loadFixtureTable(testBasePath: String, version:
HoodieTableVersion): HoodieTableMetaClient = {
val fixtureName = getFixtureName(version, "")
val resourcePath = s"/upgrade-downgrade-fixtures/mor-tables/$fixtureName"