nsivabalan commented on code in PR #14261:
URL: https://github.com/apache/hudi/pull/14261#discussion_r2561811458


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineProcedure.scala:
##########
@@ -0,0 +1,1286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.HoodieTableVersion
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestShowTimelineProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test show_timeline procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tableLocation = tmp.getCanonicalPath
+      if (HoodieSparkUtils.isSpark3_4) {
+        spark.sql("set spark.sql.defaultColumn.enabled = false")
+      }
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | price double,
+           | ts long
+           |) using hudi
+           | location '$tableLocation'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'cow',
+           |   preCombineField = 'ts'
+           | )
+           |""".stripMargin)
+
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+      spark.sql(s"update $tableName set price = 15 where id = 1")
+
+      val timelineResult = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+
+      assert(timelineResult.length == 3, "Should have 3 timeline entries (2 
inserts + 1 update)")
+
+      val firstRow = timelineResult.head
+      assert(firstRow.length == 8, "Should have 8 columns in result")
+
+      assert(firstRow.getString(0) != null, "instant_time should not be null")
+      assert(firstRow.getString(1) != null, "action should not be null")
+      assert(firstRow.getString(2) != null, "state should not be null")
+      assert(firstRow.getString(6) != null, "timeline_type should not be null")
+
+      timelineResult.foreach { row =>
+        assert(row.getString(6) == "ACTIVE", s"Timeline type should be ACTIVE, 
got: ${row.getString(6)}")
+      }
+
+      val actions = timelineResult.map(_.getString(1)).distinct
+      assert(actions.contains("commit"), "Should have commit actions in 
timeline")
+    }
+  }
+
+  test("Test show_timeline procedure - MoR table with deltacommit") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tableLocation = tmp.getCanonicalPath
+      if (HoodieSparkUtils.isSpark3_4) {
+        spark.sql("set spark.sql.defaultColumn.enabled = false")
+      }
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | price double,
+           | ts long
+           |) using hudi
+           | location '$tableLocation'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'mor',
+           |   preCombineField = 'ts'
+           | )
+           |""".stripMargin)
+
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+
+      val timelineResult = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+
+      assert(timelineResult.length == 2, "Should have 2 timeline entries")
+
+      val actions = timelineResult.map(_.getString(1)).distinct
+      assert(actions.contains("deltacommit"), "Should have deltacommit actions 
in MoR table")
+    }
+  }
+
+  test("Test show_timeline procedure - rollback operations") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tableLocation = tmp.getCanonicalPath
+      if (HoodieSparkUtils.isSpark3_4) {
+        spark.sql("set spark.sql.defaultColumn.enabled = false")
+      }
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | price double,
+           | ts long
+           |) using hudi
+           | location '$tableLocation'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'mor',
+           |   preCombineField = 'ts'
+           | )
+           |""".stripMargin)
+
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+
+      val timelineBeforeRollbackDf = spark.sql(s"call show_timeline(table => 
'$tableName')")
+      timelineBeforeRollbackDf.show(false)
+      val timelineBeforeRollback = timelineBeforeRollbackDf.collect()
+      assert(timelineBeforeRollback.length == 2, "Should have at least 2 
timeline entries")
+
+      val firstCompletedInstant = timelineBeforeRollback.find(_.getString(2) 
== "COMPLETED")
+      assert(firstCompletedInstant.isDefined, "Should have at least one 
completed instant")
+
+      val instantTimeToRollback = firstCompletedInstant.get.getString(0)
+
+      spark.sql(s"call rollback_to_instant(table => '$tableName', instant_time 
=> '$instantTimeToRollback')")
+
+      val timelineResultDf = spark.sql(s"call show_timeline(table => 
'$tableName', showArchived => true)")
+      timelineResultDf.show(false)
+      val timelineResult = timelineResultDf.collect()
+
+      assert(timelineResult.length == 2, "Should have rollback and previous 
deltacommit instance")
+
+      val actions = timelineResult.map(_.getString(1)).distinct
+      assert(actions.contains("rollback"), "Should have rollback actions in 
timeline")
+
+      val rollbackRows = timelineResult.filter(_.getString(1) == "rollback")
+      rollbackRows.foreach { row =>
+        assert(row.getString(7) != null, "rollback_info should not be null for 
rollback operations")
+        assert(row.getString(7).contains("Rolled back"), "rollback_info should 
contain rollback information")
+      }
+    }
+  }
+
+  test("Test show_timeline procedure - pending commit with null completed 
time") {
+    withSQLConf("hoodie.compact.inline" -> "false", 
"hoodie.parquet.max.file.size" -> "10000") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tableLocation = tmp.getCanonicalPath
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             |) using hudi
+             | location '$tableLocation'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'mor',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(3, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(4, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(5, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(6, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(7, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(8, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(9, 'a2', 20, 2000)")
+        spark.sql(s"insert into $tableName values(10, 'a2', 20, 2000)")
+        spark.sql(s"update $tableName set price = 15 where id = 1")
+        spark.sql(s"update $tableName set price = 30 where id = 1")
+        spark.sql(s"update $tableName set price = 30 where id = 2")
+        spark.sql(s"update $tableName set price = 30 where id = 3")
+
+        spark.sql(s"call run_compaction(table => '$tableName', op => 
'schedule')")
+        spark.sql(s"call show_compaction(table => '$tableName')").show(false)
+
+        val timelineResultDf = spark.sql(s"call show_timeline(table => 
'$tableName')")
+        timelineResultDf.show(false)
+        val timelineResult = timelineResultDf.collect()
+
+        assert(timelineResult.length == 15, "Should have timeline entries 
including scheduled pending compaction")
+
+        val pendingRows = timelineResult.filter(_.getString(2) == "REQUESTED")
+        assert(pendingRows.length == 1, "Should have 1 requested compaction 
operations")
+        if (pendingRows.nonEmpty) {
+          pendingRows.foreach { row =>
+            assert(row.getString(5) == null, "completed_time should be null 
for REQUESTED state")
+            assert(row.getString(4) == null, "inflight_time should be null for 
REQUESTED state")
+            assert(row.getString(3) != null, "requested_time should not be 
null")
+            assert(row.getString(1) == "compaction", "REQUESTED state should 
be for compaction action")
+          }
+        }
+        val completedRows = timelineResult.filter(_.getString(2) == 
"COMPLETED")
+        assert(completedRows.length == 14, "Should have 14 deltacommit 
completed operations")
+        if (completedRows.nonEmpty) {
+          completedRows.foreach { row =>
+            assert(row.getString(5) != null, "completed_time should not be 
null for COMPLETED state")
+            assert(row.getString(4) != null, "inflight_time should not be null 
for COMPLETED state")
+          }
+        }
+      }
+    }
+  }
+
+  test("Test show_timeline procedure with archived timeline V2") {
+    withSQLConf(
+      "hoodie.keep.min.commits" -> "2",
+      "hoodie.keep.max.commits" -> "3",
+      "hoodie.cleaner.commits.retained" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tableLocation = tmp.getCanonicalPath
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             |) using hudi
+             | location '$tableLocation'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'cow',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        // Create multiple commits to trigger archiving
+        for (i <- 1 to 5) {
+          spark.sql(s"insert into $tableName values($i, 'a$i', ${10 * i}, 
${1000 * i})")
+        }
+
+        // Trigger clean to potentially archive commits
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
+
+        // Test show_timeline with showArchived=true (should use 
ArchivedTimelineV2)
+        val timelineResultDf = spark.sql(s"call show_timeline(table => 
'$tableName', showArchived => true, limit => 100)")
+        timelineResultDf.show(false)
+        val timelineResult = timelineResultDf.collect()
+
+        assert(timelineResult.length >= 5, "Should have at least 5 timeline 
entries (commits + clean)")
+
+        // Verify that we can see both active and archived entries
+        val timelineTypes = timelineResult.map(_.getString(6)).distinct
+        assert(timelineTypes.contains("ACTIVE"), "Should have ACTIVE timeline 
entries")
+        // Archived entries may or may not be present depending on archiving 
trigger
+
+        // Verify all entries have required fields
+        timelineResult.foreach { row =>
+          assert(row.getString(0) != null, "instant_time should not be null")
+          assert(row.getString(1) != null, "action should not be null")
+          assert(row.getString(2) != null, "state should not be null")
+        }
+
+        val actions = timelineResult.map(_.getString(1)).distinct
+        assert(actions.contains("commit"), "Should have commit actions in 
timeline")
+      }
+    }
+  }
+
+  test("Test show_timeline procedure with archived timeline V1") {
+    withSQLConf(
+      "hoodie.keep.min.commits" -> "2",
+      "hoodie.keep.max.commits" -> "3",
+      "hoodie.cleaner.commits.retained" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tableLocation = tmp.getCanonicalPath
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             |) using hudi
+             | location '$tableLocation'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'mor',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        // Create multiple commits with updates to generate log files for MoR 
table
+        for (i <- 1 to 5) {
+          spark.sql(s"insert into $tableName values($i, 'a$i', ${10 * i}, 
${1000 * i})")
+        }
+        // Add updates to create log files that can be compacted
+        for (i <- 1 to 3) {
+          spark.sql(s"update $tableName set price = ${20 * i} where id = $i")
+        }
+
+        // Downgrade table to version 6 (which uses LAYOUT_VERSION_1, so 
ArchivedTimelineV1)
+        spark.sql(s"call downgrade_table(table => '$tableName', to_version => 
'SIX')")
+
+        // Trigger clean to potentially archive commits
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
+
+        // Test show_timeline with showArchived=true (should use 
ArchivedTimelineV1)
+        val timelineResultDf = spark.sql(s"call show_timeline(table => 
'$tableName', showArchived => true, limit => 100)")
+        timelineResultDf.show(false)
+        val timelineResult = timelineResultDf.collect()
+
+        assert(timelineResult.length >= 8, "Should have at least 8 timeline 
entries (5 inserts + 3 updates + clean)")
+
+        // Verify that we can see both active and archived entries
+        val timelineTypes = timelineResult.map(_.getString(6)).distinct
+        assert(timelineTypes.contains("ACTIVE"), "Should have ACTIVE timeline 
entries")
+        // Archived entries may or may not be present depending on archiving 
trigger
+
+        // Verify all entries have required fields
+        timelineResult.foreach { row =>
+          assert(row.getString(0) != null, "instant_time should not be null")
+          assert(row.getString(1) != null, "action should not be null")
+          assert(row.getString(2) != null, "state should not be null")
+        }
+
+        val actions = timelineResult.map(_.getString(1)).distinct
+        assert(actions.contains("deltacommit"), "Should have deltacommit 
actions in timeline")
+        assert(actions.contains("commit"), "Should have commit actions in 
timeline")
+
+        // Schedule compaction first (creates REQUESTED compaction event)
+        val scheduleResult = spark.sql(s"call run_compaction(op => 'schedule', 
table => '$tableName')")
+          .collect()
+        println(s"Scheduled compaction result: ${scheduleResult.length} 
instants")
+
+        // Check if compaction was scheduled successfully
+        val timelineAfterScheduleDf = spark.sql(s"call show_timeline(table => 
'$tableName')")
+        val timelineAfterSchedule = timelineAfterScheduleDf.collect()
+        val hasRequestedCompaction = timelineAfterSchedule.exists(row =>
+          row.getString(1) == "compaction" && row.getString(2) == "REQUESTED"
+        )
+        println(s"Has REQUESTED compaction: $hasRequestedCompaction")
+
+        // Run compaction (transitions REQUESTED -> INFLIGHT -> COMPLETED as 
COMMIT_ACTION)
+        val runResult = spark.sql(s"call run_compaction(op => 'run', table => 
'$tableName')")
+          .collect()
+        println(s"Run compaction result: ${runResult.length} instants")
+
+        // Check timeline immediately after compaction to see if commit was 
created
+        val timelineAfterRunDf = spark.sql(s"call show_timeline(table => 
'$tableName')")
+        timelineAfterRunDf.show(false)
+        val timelineAfterRun = timelineAfterRunDf.collect()
+        val actionsAfterRun = timelineAfterRun.map(_.getString(1)).distinct
+        println(s"Actions after compaction run: ${actionsAfterRun.mkString(", 
")}")
+        val hasCommitAfterRun = actionsAfterRun.contains("commit")
+        println(s"Has commit action after compaction: $hasCommitAfterRun")
+
+        // Create more commits to trigger archiving of compaction events
+        for (i <- 6 to 10) {
+          spark.sql(s"insert into $tableName values($i, 'a$i', ${10 * i}, 
${1000 * i})")
+        }
+
+        // Trigger clean to archive commits (this should also archive 
compaction events)
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
+
+        // test show timeline after compaction with archived timeline
+        val timelineResultAfterCompactionDf = spark.sql(s"call 
show_timeline(table => '$tableName', showArchived => true)")
+        timelineResultAfterCompactionDf.show(false)
+        val timelineResultAfterCompaction = 
timelineResultAfterCompactionDf.collect()
+        assert(timelineResultAfterCompaction.length >= 8, "Should have at 
least 8 timeline entries (commits + clean + compaction)")
+
+        // Fix: Use timelineResultAfterCompaction instead of timelineResult
+        val actionsAfterCompaction = 
timelineResultAfterCompaction.map(_.getString(1)).distinct
+        println(s"All actions in timeline after compaction: 
${actionsAfterCompaction.mkString(", ")}")
+        assert(actionsAfterCompaction.contains("deltacommit"), "Should have 
deltacommit actions in timeline")
+
+        // Only assert commit exists if compaction actually created one
+        // Compaction might not create a commit if there's nothing to compact
+        if (hasCommitAfterRun) {
+          assert(actionsAfterCompaction.contains("commit"), "Should have 
commit actions in timeline after compaction")
+        } else {
+          println("Warning: Compaction did not create a commit action. This 
might be expected if there was nothing to compact.")
+          // Check if compaction events exist instead
+          val compactionEvents = timelineResultAfterCompaction.filter(row =>
+            row.getString(1) == "compaction"
+          )
+          if (compactionEvents.nonEmpty) {
+            println(s"Found ${compactionEvents.length} compaction events in 
timeline")
+          }
+        }
+
+        // Check for compaction events in archived timeline
+        val archivedEntries = 
timelineResultAfterCompaction.filter(_.getString(6) == "ARCHIVED")
+        if (archivedEntries.nonEmpty) {
+          val archivedActions = archivedEntries.map(_.getString(1)).distinct
+          // Compaction events (REQUESTED/INFLIGHT) should appear in archived 
timeline if archived
+          // Note: Completed compaction becomes COMMIT_ACTION, so we check for 
"compaction" action
+          val compactionEvents = archivedEntries.filter(row =>
+            row.getString(1) == "compaction" &&
+            (row.getString(2) == "REQUESTED" || row.getString(2) == "INFLIGHT")
+          )
+          // Compaction events may or may not be archived depending on timing
+          // But if they are archived, they should be visible
+          if (compactionEvents.nonEmpty) {
+            println(s"Found ${compactionEvents.length} compaction events in 
archived timeline")
+            compactionEvents.foreach { row =>
+              assert(row.getString(0) != null, "compaction instant_time should 
not be null")
+              assert(row.getString(1) == "compaction", "action should be 
compaction")
+              assert(Set("REQUESTED", "INFLIGHT").contains(row.getString(2)),
+                s"compaction state should be REQUESTED or INFLIGHT, got: 
${row.getString(2)}")
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("Test show_timeline procedure - compare V1 and V2 archived timeline 
behavior") {
+    withSQLConf(
+      "hoodie.keep.min.commits" -> "2",
+      "hoodie.keep.max.commits" -> "3",
+      "hoodie.cleaner.commits.retained" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tableLocation = tmp.getCanonicalPath
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             |) using hudi
+             | location '$tableLocation'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'cow',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        // Create multiple commits
+        for (i <- 1 to 6) {
+          spark.sql(s"insert into $tableName values($i, 'a$i', ${10 * i}, 
${1000 * i})")
+        }
+
+        // Test with V2 (current version) first
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
+        val v2TimelineResult = spark.sql(s"call show_timeline(table => 
'$tableName', showArchived => true, limit => 100)").collect()
+
+        // Downgrade to V1
+        spark.sql(s"call downgrade_table(table => '$tableName', to_version => 
'SIX')")
+
+        // Trigger clean again
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
+
+        // Test with V1
+        val v1TimelineResult = spark.sql(s"call show_timeline(table => 
'$tableName', showArchived => true, limit => 100)").collect()
+
+        // Both should work and return timeline entries
+        assert(v1TimelineResult.length > 0, "V1 timeline should return 
entries")
+        assert(v2TimelineResult.length > 0, "V2 timeline should return 
entries")
+
+        // Verify all entries have valid structure
+        (v1TimelineResult ++ v2TimelineResult).foreach { row =>
+          assert(row.getString(0) != null, "instant_time should not be null")
+          assert(row.getString(1) != null, "action should not be null")
+          assert(row.getString(2) != null, "state should not be null")
+          assert(row.getString(6) != null, "timeline_type should not be null")
+          assert(Set("ACTIVE", "ARCHIVED").contains(row.getString(6)),
+            s"timeline_type should be ACTIVE or ARCHIVED, got: 
${row.getString(6)}")
+        }
+      }
+    }
+  }
+
+  test("Test show_timeline procedure with startTime and endTime filtering") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tableLocation = tmp.getCanonicalPath
+      if (HoodieSparkUtils.isSpark3_4) {
+        spark.sql("set spark.sql.defaultColumn.enabled = false")
+      }
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | price double,
+           | ts long
+           |) using hudi
+           | location '$tableLocation'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'cow',
+           |   preCombineField = 'ts'
+           | )
+           |""".stripMargin)
+
+      // Create multiple commits - we'll capture their timestamps
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      val timelineAfterFirst = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+      val firstCommitTime = timelineAfterFirst.head.getString(0)
+
+      // Wait a bit to ensure different timestamps (if needed, add small delay)
+      Thread.sleep(100)
+
+      spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+      val timelineAfterSecond = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+      val secondCommitTime = timelineAfterSecond.head.getString(0)
+
+      Thread.sleep(100)
+
+      spark.sql(s"insert into $tableName values(3, 'a3', 30, 3000)")
+      val timelineAfterThird = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+      val thirdCommitTime = timelineAfterThird.head.getString(0)
+
+      Thread.sleep(100)
+
+      spark.sql(s"insert into $tableName values(4, 'a4', 40, 4000)")
+      val timelineAfterFourth = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+      val fourthCommitTime = timelineAfterFourth.head.getString(0)
+
+      Thread.sleep(100)
+
+      spark.sql(s"insert into $tableName values(5, 'a5', 50, 5000)")
+      val timelineAfterFifth = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+      val fifthCommitTime = timelineAfterFifth.head.getString(0)
+
+      // Get all timeline entries without filtering
+      val allTimelineResult = spark.sql(s"call show_timeline(table => 
'$tableName')").collect()
+      assert(allTimelineResult.length == 5, "Should have 5 timeline entries")
+
+      // Test 1: Filter with both startTime and endTime (inclusive range)
+      // Use secondCommitTime as start and fourthCommitTime as end
+      val rangeFilteredResult = spark.sql(
+        s"call show_timeline(table => '$tableName', startTime => 
'$secondCommitTime', endTime => '$fourthCommitTime')"
+      ).collect()
+
+      // Should include second, third, and fourth commits (inclusive on both 
ends)
+      assert(rangeFilteredResult.length == 3,
+        s"Should have 3 timeline entries in range [$secondCommitTime, 
$fourthCommitTime], got: ${rangeFilteredResult.length}")
+
+      val rangeFilteredTimes = 
rangeFilteredResult.map(_.getString(0)).sorted.reverse
+      assert(rangeFilteredTimes.contains(secondCommitTime), "Should include 
second commit")
+      assert(rangeFilteredTimes.contains(thirdCommitTime), "Should include 
third commit")
+      assert(rangeFilteredTimes.contains(fourthCommitTime), "Should include 
fourth commit")
+      assert(!rangeFilteredTimes.contains(firstCommitTime), "Should not 
include first commit")
+      assert(!rangeFilteredTimes.contains(fifthCommitTime), "Should not 
include fifth commit")
+
+      // Verify all entries in range are within bounds
+      rangeFilteredResult.foreach { row =>
+        val instantTime = row.getString(0)
+        assert(instantTime >= secondCommitTime && instantTime <= 
fourthCommitTime,
+          s"Instant time $instantTime should be in range [$secondCommitTime, 
$fourthCommitTime]")
+      }
+
+      // Test 2: Filter with only startTime (should get all commits >= 
startTime)
+      val startTimeOnlyResult = spark.sql(
+        s"call show_timeline(table => '$tableName', startTime => 
'$thirdCommitTime')"
+      ).collect()
+
+      // Should include third, fourth, and fifth commits
+      assert(startTimeOnlyResult.length == 3,
+        s"Should have 3 timeline entries >= $thirdCommitTime, got: 
${startTimeOnlyResult.length}")
+
+      val startTimeOnlyTimes = 
startTimeOnlyResult.map(_.getString(0)).sorted.reverse
+      assert(startTimeOnlyTimes.contains(thirdCommitTime), "Should include 
third commit")
+      assert(startTimeOnlyTimes.contains(fourthCommitTime), "Should include 
fourth commit")
+      assert(startTimeOnlyTimes.contains(fifthCommitTime), "Should include 
fifth commit")
+      assert(!startTimeOnlyTimes.contains(firstCommitTime), "Should not 
include first commit")
+      assert(!startTimeOnlyTimes.contains(secondCommitTime), "Should not 
include second commit")
+
+      // Verify all entries are >= startTime
+      startTimeOnlyResult.foreach { row =>
+        val instantTime = row.getString(0)
+        assert(instantTime >= thirdCommitTime,
+          s"Instant time $instantTime should be >= $thirdCommitTime")
+      }
+
+      // Test 3: Filter with only endTime (should get all commits <= endTime)
+      val endTimeOnlyResult = spark.sql(
+        s"call show_timeline(table => '$tableName', endTime => 
'$thirdCommitTime')"
+      ).collect()
+
+      // Should include first, second, and third commits
+      assert(endTimeOnlyResult.length == 3,
+        s"Should have 3 timeline entries <= $thirdCommitTime, got: 
${endTimeOnlyResult.length}")
+
+      val endTimeOnlyTimes = 
endTimeOnlyResult.map(_.getString(0)).sorted.reverse
+      assert(endTimeOnlyTimes.contains(firstCommitTime), "Should include first 
commit")
+      assert(endTimeOnlyTimes.contains(secondCommitTime), "Should include 
second commit")
+      assert(endTimeOnlyTimes.contains(thirdCommitTime), "Should include third 
commit")
+      assert(!endTimeOnlyTimes.contains(fourthCommitTime), "Should not include 
fourth commit")
+      assert(!endTimeOnlyTimes.contains(fifthCommitTime), "Should not include 
fifth commit")
+
+      // Verify all entries are <= endTime
+      endTimeOnlyResult.foreach { row =>
+        val instantTime = row.getString(0)
+        assert(instantTime <= thirdCommitTime,
+          s"Instant time $instantTime should be <= $thirdCommitTime")
+      }
+
+      // Test 4: Filter with startTime and endTime where range has no results
+      // Use a range that doesn't include any commits (between commits)
+      val emptyRangeResult = spark.sql(
+        s"call show_timeline(table => '$tableName', startTime => 
'$fifthCommitTime', endTime => '$firstCommitTime')"
+      ).collect()
+
+      // Should return empty since startTime > endTime (invalid range)
+      assert(emptyRangeResult.length == 0,
+        s"Should have 0 timeline entries for invalid range [$fifthCommitTime, 
$firstCommitTime], got: ${emptyRangeResult.length}")
+
+      // Test 5: Filter with startTime and endTime being the same (should 
return that single commit)
+      val singleTimeResult = spark.sql(
+        s"call show_timeline(table => '$tableName', startTime => 
'$thirdCommitTime', endTime => '$thirdCommitTime')"
+      ).collect()
+
+      assert(singleTimeResult.length == 1,
+        s"Should have 1 timeline entry for single time point $thirdCommitTime, 
got: ${singleTimeResult.length}")
+      assert(singleTimeResult.head.getString(0) == thirdCommitTime,
+        s"Should return the commit with time $thirdCommitTime")
+    }
+  }
+
+  /**
+   * Helper method to create a table with all types of commits for 
comprehensive testing.
+   * Creates: completed/inflight commits, deltacommits (MOR), clean, 
compaction, clustering, insert overwrite, rollback
+   */
+  private def setupTableWithAllCommitTypes(tableName: String, tableLocation: 
String, tableType: String): Map[String, String] = {

Review Comment:
   refer to TestArchivedTimelineV1.createInstants()
   and TestArchivedTimelineV2.writeArchivedTimeline(...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to