nsivabalan commented on code in PR #14261:
URL: https://github.com/apache/hudi/pull/14261#discussion_r2551656619


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()
+          .lastInstant()
+        if (lastInstantOpt.isPresent) {
+          lastInstantOpt.get().requestedTime()
+        } else {
+          ""
+        }
+      }
+      // Create archived timeline starting from max active instant time
+      // This will be empty if all archived instants are older than active 
timeline
+      val timeline = if (maxActiveInstantTime.nonEmpty) {
+        metaClient.getTableFormat().getTimelineFactory()
+          .createArchivedTimeline(metaClient, maxActiveInstantTime)
+//        metaClient.getArchivedTimeline()
+      } else {
+        // If no active instants, create empty timeline
+        metaClient.getArchivedTimeline()
+      }
+      // Load the required details with appropriate LoadMode (METADATA for 
commits, PLAN for compactions)
+      // Note: loadCompletedInstantDetailsInMemory may have already loaded 
METADATA via constructor,
+      // but we call it again to ensure we have the data for the specified 
time range or limit.
+      if (startTime.nonEmpty && endTime.nonEmpty) {
+        timeline.loadCompletedInstantDetailsInMemory(startTime, endTime)
+        timeline.loadCompactionDetailsInMemory(startTime, endTime)
+      } else {
+        timeline.loadCompletedInstantDetailsInMemory(limit)
+        timeline.loadCompactionDetailsInMemory(limit)
+      }
+      val rollbackInfoMap = getRolledBackInstantInfo(timeline, metaClient)
+      (timeline, rollbackInfoMap)
+    } else {
+      (null, Map.empty[String, List[String]])
+    }
+
+    val activeEntries = getTimelineEntriesFromTimeline(
+      metaClient.getActiveTimeline, "ACTIVE", metaClient, instantInfoMap, 
activeRollbackInfoMap, limit, startTime, endTime
+    )
+
+    val finalEntries = if (showArchived) {
+      val archivedEntries = getTimelineEntriesFromTimeline(
+        archivedTimeline, "ARCHIVED", metaClient, instantInfoMap, 
archivedRollbackInfoMap, limit, startTime, endTime
+      )
+      val combinedEntries = (activeEntries ++ archivedEntries)
+        .sortWith((a, b) => {
+          val timePriorityOrder = a.getString(0).compareTo(b.getString(0))

Review Comment:
   lets use RequestedTimeBasedComparator and not come up w/ our own comparator.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()

Review Comment:
   lets do first entry in active timeline. 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()
+          .lastInstant()
+        if (lastInstantOpt.isPresent) {
+          lastInstantOpt.get().requestedTime()
+        } else {
+          ""
+        }
+      }
+      // Create archived timeline starting from max active instant time
+      // This will be empty if all archived instants are older than active 
timeline
+      val timeline = if (maxActiveInstantTime.nonEmpty) {
+        metaClient.getTableFormat().getTimelineFactory()
+          .createArchivedTimeline(metaClient, maxActiveInstantTime)
+//        metaClient.getArchivedTimeline()
+      } else {
+        // If no active instants, create empty timeline
+        metaClient.getArchivedTimeline()
+      }
+      // Load the required details with appropriate LoadMode (METADATA for 
commits, PLAN for compactions)
+      // Note: loadCompletedInstantDetailsInMemory may have already loaded 
METADATA via constructor,
+      // but we call it again to ensure we have the data for the specified 
time range or limit.
+      if (startTime.nonEmpty && endTime.nonEmpty) {
+        timeline.loadCompletedInstantDetailsInMemory(startTime, endTime)
+        timeline.loadCompactionDetailsInMemory(startTime, endTime)
+      } else {
+        timeline.loadCompletedInstantDetailsInMemory(limit)
+        timeline.loadCompactionDetailsInMemory(limit)
+      }
+      val rollbackInfoMap = getRolledBackInstantInfo(timeline, metaClient)
+      (timeline, rollbackInfoMap)
+    } else {
+      (null, Map.empty[String, List[String]])
+    }
+
+    val activeEntries = getTimelineEntriesFromTimeline(
+      metaClient.getActiveTimeline, "ACTIVE", metaClient, instantInfoMap, 
activeRollbackInfoMap, limit, startTime, endTime
+    )
+
+    val finalEntries = if (showArchived) {
+      val archivedEntries = getTimelineEntriesFromTimeline(
+        archivedTimeline, "ARCHIVED", metaClient, instantInfoMap, 
archivedRollbackInfoMap, limit, startTime, endTime
+      )
+      val combinedEntries = (activeEntries ++ archivedEntries)
+        .sortWith((a, b) => {
+          val timePriorityOrder = a.getString(0).compareTo(b.getString(0))
+          if (timePriorityOrder != 0) {
+            timePriorityOrder > 0
+          } else {
+            val statePriorityOrder = Map("COMPLETED" -> 3, "INFLIGHT" -> 2, 
"REQUESTED" -> 1)
+            val state1 = a.getString(2)
+            val state2 = b.getString(2)
+            val priority1 = statePriorityOrder.getOrElse(state1, 0)
+            val priority2 = statePriorityOrder.getOrElse(state2, 0)
+            priority1 > priority2
+          }
+        })
+
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        combinedEntries

Review Comment:
   lets try to apply code reuse wherever possible



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()
+          .lastInstant()
+        if (lastInstantOpt.isPresent) {
+          lastInstantOpt.get().requestedTime()
+        } else {
+          ""
+        }
+      }
+      // Create archived timeline starting from max active instant time
+      // This will be empty if all archived instants are older than active 
timeline
+      val timeline = if (maxActiveInstantTime.nonEmpty) {
+        metaClient.getTableFormat().getTimelineFactory()
+          .createArchivedTimeline(metaClient, maxActiveInstantTime)
+//        metaClient.getArchivedTimeline()

Review Comment:
   clean up unwanted changes



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java:
##########
@@ -56,29 +59,58 @@ public void loadInstants(HoodieTableMetaClient metaClient,
       // List all files
       List<String> fileNames = LSMTimeline.latestSnapshotManifest(metaClient, 
metaClient.getArchivePath()).getFileNames();
 
+      // Check if consumer supports early termination
+      StoppableRecordConsumer stoppable = recordConsumer instanceof 
StoppableRecordConsumer
+          ? (StoppableRecordConsumer) recordConsumer
+          : null;
+
+      // Filter files by time range
+      List<String> filteredFiles = new ArrayList<>();
+      for (String fileName : fileNames) {
+        if (filter == null || LSMTimeline.isFileInRange(filter, fileName)) {
+          filteredFiles.add(fileName);
+        }
+      }
+
+      // Sort files in reverse chronological order if needed (newest first for 
limit queries)
+      if (stoppable != null && stoppable.needsReverseOrder()) {
+        filteredFiles.sort(Comparator.comparing((String fileName) -> {
+          try {
+            return LSMTimeline.getMaxInstantTime(fileName);
+          } catch (Exception e) {
+            return "";
+          }
+        }).reversed());
+      }
+
       Schema readSchema = LSMTimeline.getReadSchema(loadMode);
-      fileNames.stream()
-          .filter(fileName -> filter == null || 
LSMTimeline.isFileInRange(filter, fileName))
-          .parallel().forEach(fileName -> {
-            // Read the archived file
-            try (HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(metaClient.getStorage())
-                .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-                .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, new 
StoragePath(metaClient.getArchivePath(), fileName))) {
-              try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), 
readSchema)) {
-                while (iterator.hasNext()) {
-                  GenericRecord record = (GenericRecord) iterator.next();
-                  String instantTime = 
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
-                  if ((filter == null || filter.isInRange(instantTime))
-                      && commitsFilter.apply(record)) {
-                    recordConsumer.accept(instantTime, record);
-                  }
-                }
+      filteredFiles.parallelStream().forEach(fileName -> {
+        if (stoppable != null && stoppable.shouldStop()) {

Review Comment:
   actually, we can't marry limit and parallel stream. 
   so, whenever we wanted to do `limit`, lets avoid parallel processing. 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()
+          .lastInstant()
+        if (lastInstantOpt.isPresent) {
+          lastInstantOpt.get().requestedTime()
+        } else {
+          ""
+        }
+      }
+      // Create archived timeline starting from max active instant time
+      // This will be empty if all archived instants are older than active 
timeline
+      val timeline = if (maxActiveInstantTime.nonEmpty) {
+        metaClient.getTableFormat().getTimelineFactory()
+          .createArchivedTimeline(metaClient, maxActiveInstantTime)
+//        metaClient.getArchivedTimeline()
+      } else {
+        // If no active instants, create empty timeline
+        metaClient.getArchivedTimeline()
+      }
+      // Load the required details with appropriate LoadMode (METADATA for 
commits, PLAN for compactions)
+      // Note: loadCompletedInstantDetailsInMemory may have already loaded 
METADATA via constructor,
+      // but we call it again to ensure we have the data for the specified 
time range or limit.
+      if (startTime.nonEmpty && endTime.nonEmpty) {
+        timeline.loadCompletedInstantDetailsInMemory(startTime, endTime)
+        timeline.loadCompactionDetailsInMemory(startTime, endTime)
+      } else {
+        timeline.loadCompletedInstantDetailsInMemory(limit)
+        timeline.loadCompactionDetailsInMemory(limit)
+      }
+      val rollbackInfoMap = getRolledBackInstantInfo(timeline, metaClient)
+      (timeline, rollbackInfoMap)
+    } else {
+      (null, Map.empty[String, List[String]])
+    }
+
+    val activeEntries = getTimelineEntriesFromTimeline(
+      metaClient.getActiveTimeline, "ACTIVE", metaClient, instantInfoMap, 
activeRollbackInfoMap, limit, startTime, endTime
+    )
+
+    val finalEntries = if (showArchived) {
+      val archivedEntries = getTimelineEntriesFromTimeline(
+        archivedTimeline, "ARCHIVED", metaClient, instantInfoMap, 
archivedRollbackInfoMap, limit, startTime, endTime
+      )
+      val combinedEntries = (activeEntries ++ archivedEntries)
+        .sortWith((a, b) => {
+          val timePriorityOrder = a.getString(0).compareTo(b.getString(0))
+          if (timePriorityOrder != 0) {
+            timePriorityOrder > 0
+          } else {
+            val statePriorityOrder = Map("COMPLETED" -> 3, "INFLIGHT" -> 2, 
"REQUESTED" -> 1)
+            val state1 = a.getString(2)
+            val state2 = b.getString(2)
+            val priority1 = statePriorityOrder.getOrElse(state1, 0)
+            val priority2 = statePriorityOrder.getOrElse(state2, 0)
+            priority1 > priority2
+          }
+        })
+
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        combinedEntries
+      } else {
+        combinedEntries.take(limit)
+      }
+    } else {
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        activeEntries
+      } else {
+        activeEntries.take(limit)
+      }
+    }
+
+    finalEntries
+  }
+
+  /**
+   * Extracts timeline entries from a specific timeline (active or archived) 
and converts them to Row objects.
+   *
+   * This method processes instants from the given timeline by:
+   * 1. Filtering instants by time range using timeline's built-in filtering 
methods if `startTime` or `endTime` are provided
+   * 2. Retrieving filtered instants from the timeline
+   * 3. Sorting filtered instants by timestamp in descending order (newest 
first)
+   * 4. Applying limit if time range is not specified (when time range is 
specified, all matching entries are returned)
+   * 5. Converting each instant to a Row using `createTimelineEntry`
+   *
+   * @param timeline The timeline to extract entries from (can be active or 
archived timeline)
+   * @param timelineType The type of timeline: "ACTIVE" or "ARCHIVED". This is 
used to mark entries in the output
+   * @param metaClient The Hudi table metadata client for accessing timeline 
metadata
+   * @param instantInfoMap Map of instant timestamps to their state 
information and modification times.
+   *                      Used for formatting dates in the output. Only 
contains active timeline entries.
+   * @param rollbackInfoMap Map of instant timestamps to list of rollback 
instants that rolled them back.
+   *                       Used for populating rollback information in the 
output
+   * @param limit Maximum number of entries to return. Only applied if time 
range is not specified
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss).
+   *                 If non-empty, only instants with timestamp >= startTime 
are included
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss).
+   *               If non-empty, only instants with timestamp <= endTime are 
included
+   * @return Sequence of Row objects representing timeline entries from the 
specified timeline,
+   *         sorted by timestamp (descending), limited if applicable
+   *
+   * @note Time range filtering is inclusive on both ends (startTime <= 
instantTime <= endTime)
+   * @note If both `startTime` and `endTime` are provided, uses 
`findInstantsInClosedRange`
+   * @note If only `startTime` is provided, uses `findInstantsAfterOrEquals`
+   * @note If only `endTime` is provided, uses `findInstantsBeforeOrEquals`
+   * @note If both `startTime` and `endTime` are provided, the limit is 
ignored and all matching entries are returned
+   * @note If neither `startTime` nor `endTime` are provided, the limit is 
applied to the sorted results
+   *
+   * @see [[createTimelineEntry]] for the conversion of HoodieInstant to Row
+   * @see [[getTimelineEntries]] for the main method that orchestrates 
timeline entry retrieval
+   * @see [[HoodieTimeline.findInstantsInClosedRange]] for range filtering 
implementation
+   */
+  private def getTimelineEntriesFromTimeline(timeline: HoodieTimeline,
+                                             timelineType: String,
+                                             metaClient: HoodieTableMetaClient,
+                                             instantInfoMap: Map[String, 
Map[HoodieInstant.State, HoodieInstantWithModTime]],
+                                             rollbackInfoMap: Map[String, 
List[String]],
+                                             limit: Int,
+                                             startTime: String,
+                                             endTime: String): Seq[Row] = {
+
+    // Use timeline's built-in filtering methods for better performance and 
consistency
+    val filteredTimeline = {
+      val startTimeTrimmed = startTime.trim
+      val endTimeTrimmed = endTime.trim
+      if (startTimeTrimmed.nonEmpty && endTimeTrimmed.nonEmpty) {
+        // Both start and end time provided: use closed range [startTime, 
endTime]
+        timeline.findInstantsInClosedRange(startTimeTrimmed, endTimeTrimmed)
+      } else if (startTimeTrimmed.nonEmpty) {
+        // Only start time provided: get instants >= startTime
+        timeline.findInstantsAfterOrEquals(startTimeTrimmed)
+      } else if (endTimeTrimmed.nonEmpty) {
+        // Only end time provided: get instants <= endTime
+        timeline.findInstantsBeforeOrEquals(endTimeTrimmed)
+      } else {
+        // No time filtering: use original timeline
+        timeline
+      }
+    }
+
+    val instants = filteredTimeline.getInstants.iterator().asScala.toSeq
+
+    // Sort by timestamp in descending order (newest first)
+    val sortedInstants = instants.sortWith((a, b) => a.requestedTime() > 
b.requestedTime())
+
+    // Apply limit only if time range is not fully specified
+    val limitedInstants = if (startTime.trim.nonEmpty && 
endTime.trim.nonEmpty) {
+      sortedInstants
+    } else {
+      sortedInstants.take(limit)
+    }
+
+    limitedInstants.map { instant =>
+      createTimelineEntry(instant, timelineType, metaClient, instantInfoMap, 
rollbackInfoMap)
+    }
+  }
+
+  private def getRolledBackInstantInfo(timeline: HoodieTimeline, metaClient: 
HoodieTableMetaClient): Map[String, List[String]] = {
+    val rollbackInfoMap = scala.collection.mutable.Map[String, 
scala.collection.mutable.ListBuffer[String]]()
+
+    val rollbackInstants = timeline.filter(instant =>
+      
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction)).getInstants
+
+    rollbackInstants.asScala.foreach { rollbackInstant =>
+      try {
+        if (rollbackInstant.isInflight) {

Review Comment:
   lets do 
   ```
   if (!rollbackInstant.isComplete...) {
   ```
   



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()
+          .lastInstant()
+        if (lastInstantOpt.isPresent) {
+          lastInstantOpt.get().requestedTime()
+        } else {
+          ""
+        }
+      }
+      // Create archived timeline starting from max active instant time
+      // This will be empty if all archived instants are older than active 
timeline
+      val timeline = if (maxActiveInstantTime.nonEmpty) {
+        metaClient.getTableFormat().getTimelineFactory()
+          .createArchivedTimeline(metaClient, maxActiveInstantTime)
+//        metaClient.getArchivedTimeline()
+      } else {
+        // If no active instants, create empty timeline
+        metaClient.getArchivedTimeline()
+      }
+      // Load the required details with appropriate LoadMode (METADATA for 
commits, PLAN for compactions)
+      // Note: loadCompletedInstantDetailsInMemory may have already loaded 
METADATA via constructor,
+      // but we call it again to ensure we have the data for the specified 
time range or limit.
+      if (startTime.nonEmpty && endTime.nonEmpty) {
+        timeline.loadCompletedInstantDetailsInMemory(startTime, endTime)
+        timeline.loadCompactionDetailsInMemory(startTime, endTime)
+      } else {
+        timeline.loadCompletedInstantDetailsInMemory(limit)
+        timeline.loadCompactionDetailsInMemory(limit)
+      }
+      val rollbackInfoMap = getRolledBackInstantInfo(timeline, metaClient)
+      (timeline, rollbackInfoMap)
+    } else {
+      (null, Map.empty[String, List[String]])
+    }
+
+    val activeEntries = getTimelineEntriesFromTimeline(
+      metaClient.getActiveTimeline, "ACTIVE", metaClient, instantInfoMap, 
activeRollbackInfoMap, limit, startTime, endTime
+    )
+
+    val finalEntries = if (showArchived) {
+      val archivedEntries = getTimelineEntriesFromTimeline(
+        archivedTimeline, "ARCHIVED", metaClient, instantInfoMap, 
archivedRollbackInfoMap, limit, startTime, endTime
+      )
+      val combinedEntries = (activeEntries ++ archivedEntries)
+        .sortWith((a, b) => {
+          val timePriorityOrder = a.getString(0).compareTo(b.getString(0))
+          if (timePriorityOrder != 0) {
+            timePriorityOrder > 0
+          } else {
+            val statePriorityOrder = Map("COMPLETED" -> 3, "INFLIGHT" -> 2, 
"REQUESTED" -> 1)
+            val state1 = a.getString(2)
+            val state2 = b.getString(2)
+            val priority1 = statePriorityOrder.getOrElse(state1, 0)
+            val priority2 = statePriorityOrder.getOrElse(state2, 0)
+            priority1 > priority2
+          }
+        })
+
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        combinedEntries
+      } else {
+        combinedEntries.take(limit)
+      }
+    } else {
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        activeEntries
+      } else {
+        activeEntries.take(limit)
+      }
+    }
+
+    finalEntries
+  }
+
+  /**
+   * Extracts timeline entries from a specific timeline (active or archived) 
and converts them to Row objects.
+   *
+   * This method processes instants from the given timeline by:
+   * 1. Filtering instants by time range using timeline's built-in filtering 
methods if `startTime` or `endTime` are provided
+   * 2. Retrieving filtered instants from the timeline
+   * 3. Sorting filtered instants by timestamp in descending order (newest 
first)
+   * 4. Applying limit if time range is not specified (when time range is 
specified, all matching entries are returned)
+   * 5. Converting each instant to a Row using `createTimelineEntry`
+   *
+   * @param timeline The timeline to extract entries from (can be active or 
archived timeline)
+   * @param timelineType The type of timeline: "ACTIVE" or "ARCHIVED". This is 
used to mark entries in the output
+   * @param metaClient The Hudi table metadata client for accessing timeline 
metadata
+   * @param instantInfoMap Map of instant timestamps to their state 
information and modification times.
+   *                      Used for formatting dates in the output. Only 
contains active timeline entries.
+   * @param rollbackInfoMap Map of instant timestamps to list of rollback 
instants that rolled them back.
+   *                       Used for populating rollback information in the 
output
+   * @param limit Maximum number of entries to return. Only applied if time 
range is not specified
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss).
+   *                 If non-empty, only instants with timestamp >= startTime 
are included
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss).
+   *               If non-empty, only instants with timestamp <= endTime are 
included
+   * @return Sequence of Row objects representing timeline entries from the 
specified timeline,
+   *         sorted by timestamp (descending), limited if applicable
+   *
+   * @note Time range filtering is inclusive on both ends (startTime <= 
instantTime <= endTime)
+   * @note If both `startTime` and `endTime` are provided, uses 
`findInstantsInClosedRange`
+   * @note If only `startTime` is provided, uses `findInstantsAfterOrEquals`
+   * @note If only `endTime` is provided, uses `findInstantsBeforeOrEquals`
+   * @note If both `startTime` and `endTime` are provided, the limit is 
ignored and all matching entries are returned
+   * @note If neither `startTime` nor `endTime` are provided, the limit is 
applied to the sorted results
+   *
+   * @see [[createTimelineEntry]] for the conversion of HoodieInstant to Row
+   * @see [[getTimelineEntries]] for the main method that orchestrates 
timeline entry retrieval
+   * @see [[HoodieTimeline.findInstantsInClosedRange]] for range filtering 
implementation
+   */
+  private def getTimelineEntriesFromTimeline(timeline: HoodieTimeline,
+                                             timelineType: String,
+                                             metaClient: HoodieTableMetaClient,
+                                             instantInfoMap: Map[String, 
Map[HoodieInstant.State, HoodieInstantWithModTime]],
+                                             rollbackInfoMap: Map[String, 
List[String]],
+                                             limit: Int,
+                                             startTime: String,
+                                             endTime: String): Seq[Row] = {
+
+    // Use timeline's built-in filtering methods for better performance and 
consistency
+    val filteredTimeline = {
+      val startTimeTrimmed = startTime.trim
+      val endTimeTrimmed = endTime.trim
+      if (startTimeTrimmed.nonEmpty && endTimeTrimmed.nonEmpty) {
+        // Both start and end time provided: use closed range [startTime, 
endTime]
+        timeline.findInstantsInClosedRange(startTimeTrimmed, endTimeTrimmed)
+      } else if (startTimeTrimmed.nonEmpty) {
+        // Only start time provided: get instants >= startTime
+        timeline.findInstantsAfterOrEquals(startTimeTrimmed)
+      } else if (endTimeTrimmed.nonEmpty) {
+        // Only end time provided: get instants <= endTime
+        timeline.findInstantsBeforeOrEquals(endTimeTrimmed)
+      } else {
+        // No time filtering: use original timeline
+        timeline
+      }
+    }
+
+    val instants = filteredTimeline.getInstants.iterator().asScala.toSeq
+
+    // Sort by timestamp in descending order (newest first)
+    val sortedInstants = instants.sortWith((a, b) => a.requestedTime() > 
b.requestedTime())
+
+    // Apply limit only if time range is not fully specified
+    val limitedInstants = if (startTime.trim.nonEmpty && 
endTime.trim.nonEmpty) {
+      sortedInstants
+    } else {
+      sortedInstants.take(limit)
+    }
+
+    limitedInstants.map { instant =>
+      createTimelineEntry(instant, timelineType, metaClient, instantInfoMap, 
rollbackInfoMap)
+    }
+  }
+
+  private def getRolledBackInstantInfo(timeline: HoodieTimeline, metaClient: 
HoodieTableMetaClient): Map[String, List[String]] = {
+    val rollbackInfoMap = scala.collection.mutable.Map[String, 
scala.collection.mutable.ListBuffer[String]]()
+
+    val rollbackInstants = timeline.filter(instant =>
+      
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction)).getInstants
+
+    rollbackInstants.asScala.foreach { rollbackInstant =>
+      try {
+        if (rollbackInstant.isInflight) {
+          val instantToUse = metaClient.createNewInstant(
+            HoodieInstant.State.REQUESTED, rollbackInstant.getAction, 
rollbackInstant.requestedTime())
+          val metadata = timeline.readRollbackPlan(instantToUse)
+          val rolledBackInstant = metadata.getInstantToRollback.getCommitTime
+          rollbackInfoMap.getOrElseUpdate(rolledBackInstant, 
scala.collection.mutable.ListBuffer[String]())
+            .append(rollbackInstant.requestedTime())
+        } else {
+          val metadata = timeline.readRollbackMetadata(rollbackInstant)
+          metadata.getCommitsRollback.asScala.foreach { instant =>
+            rollbackInfoMap.getOrElseUpdate(instant, 
scala.collection.mutable.ListBuffer[String]())
+              .append(rollbackInstant.requestedTime())
+          }
+        }
+      } catch {
+        case _: Exception => // Skip invalid rollback instants
+      }
+    }
+    rollbackInfoMap.map { case (k, v) => k -> v.toList }.toMap
+  }
+
+  private def getRollbackInfo(instant: HoodieInstant, timeline: 
HoodieTimeline, rollbackInfoMap: Map[String, List[String]], metaClient: 
HoodieTableMetaClient): String = {
+    try {
+      if (HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction)) {
+        if (instant.isInflight) {

Review Comment:
   same comment as above



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()
+          .lastInstant()
+        if (lastInstantOpt.isPresent) {
+          lastInstantOpt.get().requestedTime()
+        } else {
+          ""

Review Comment:
   lets use `HoodieTimeline.INIT_INSTANT_TS` as the placeholder. 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure to show timeline information for a Hudi table.
+ *
+ * This procedure displays comprehensive information about all timeline 
instants including commits,
+ * compactions, clustering, cleaning, and other table operations. It supports 
both active and
+ * archived timelines with time-based filtering and generic SQL filters.
+ *
+ * == Parameters ==
+ * - `table`: Optional. The name of the Hudi table to query (mutually 
exclusive with `path`)
+ * - `path`: Optional. The base path of the Hudi table (mutually exclusive 
with `table`)
+ * - `limit`: Optional. Maximum number of timeline entries to return (default: 
20)
+ * - `showArchived`: Optional. Whether to include archived timeline data 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ * - `startTime`: Optional. Start timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ * - `endTime`: Optional. End timestamp for filtering results (format: 
yyyyMMddHHmmss)
+ *
+ * == Output Schema ==
+ * - `instant_time`: Timestamp when the operation was performed
+ * - `action`: The action type (commit, deltacommit, compaction, clustering, 
clean, rollback, etc.)
+ * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED)
+ * - `requested_time`: Formatted date when the operation was requested (MM-dd 
HH:mm:ss format)
+ * - `inflight_time`: Formatted date when the operation became inflight (MM-dd 
HH:mm:ss format)
+ * - `completed_time`: Formatted date when the operation completed (MM-dd 
HH:mm format, "-" if not completed)
+ * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline
+ * - `rollback_info`: Information about rollback operations (what was rolled 
back or what rolled back this operation)
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no timeline entries match the criteria
+ * - Gracefully handles archived timeline access failures with warning logs
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results on 
any output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show timeline entries
+ * CALL show_timeline(table => 'hudi_table')
+ *
+ * -- Show timeline with custom limit
+ * CALL show_timeline(table => 'hudi_table', limit => 50)
+ *
+ * -- Include archived timeline data
+ * CALL show_timeline(table => 'hudi_table', showArchived => true)
+ *
+ * -- Filter for specific actions
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   filter => "action = 'commit'"
+ * )
+ *
+ * -- Show timeline with time range and filtering
+ * CALL show_timeline(
+ *   table => 'hudi_table',
+ *   startTime => '20251201000000',
+ *   endTime => '20251231235959',
+ *   filter => "action = 'commit' AND state = 'COMPLETED'"
+ * )
+ * }}}
+ *
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
+class ShowTimelineProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("requested_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("inflight_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("completed_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline_type", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("rollback_info", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+
+    validateFilter(filter, outputType)
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    val timelineEntries = getTimelineEntries(metaClient, limit, showArchived, 
startTime, endTime)
+
+    applyFilter(timelineEntries, filter, outputType)
+  }
+
+  override def build: Procedure = new ShowTimelineProcedure()
+
+  /**
+   * Retrieves timeline entries from both active and archived timelines based 
on the provided parameters.
+   *
+   * This is the main orchestration method that coordinates the retrieval of 
timeline entries. It:
+   * 1. Builds instant information map from the active timeline (for 
modification time lookups)
+   * 2. Loads rollback information for both active and archived timelines
+   * 3. Optionally loads archived timeline details if `showArchived` is true
+   * 4. Retrieves entries from active timeline
+   * 5. Optionally retrieves entries from archived timeline and combines them
+   * 6. Sorts combined entries by timestamp (descending) and state priority
+   * 7. Applies limit or time range filtering as appropriate
+   *
+   * @param metaClient The Hudi table metadata client for accessing timeline 
data
+   * @param limit Maximum number of entries to return (ignored if time range 
is specified)
+   * @param showArchived Whether to include archived timeline entries in the 
result
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no start filtering is applied
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss). If empty, no end filtering is applied
+   * @return Sequence of Row objects representing timeline entries, sorted by 
timestamp (descending) and state priority
+   *         (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @note When `showArchived` is true, this method will:
+   *       - Reload the archived timeline to ensure it's up-to-date
+   *       - Load completed instant details and compaction details into memory 
for the specified time range or limit
+   *       - Combine active and archived entries, with archived entries marked 
with "ARCHIVED" timeline type
+   *
+   * @note The sorting logic prioritizes:
+   *       1. Timestamp (newer first)
+   *       2. State (COMPLETED > INFLIGHT > REQUESTED)
+   *
+   * @see [[getTimelineEntriesFromTimeline]] for extracting entries from a 
specific timeline
+   * @see [[buildInstantInfoFromTimeline]] for building the instant 
information map
+   */
+  private def getTimelineEntries(metaClient: HoodieTableMetaClient,
+                                 limit: Int,
+                                 showArchived: Boolean,
+                                 startTime: String,
+                                 endTime: String): Seq[Row] = {
+
+    val instantInfoMap = buildInstantInfoFromTimeline(metaClient)
+
+    val activeRollbackInfoMap = 
getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient)
+
+    // Create archived timeline starting from the maximum instant time in 
active timeline
+    // This way, if all archived instants are older than the active timeline's 
max instant,
+    // the archived timeline will be empty and won't load anything, avoiding 
unnecessary loading.
+    // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we 
use the startTs
+    // constructor which loads with LoadMode.METADATA, and then load specific 
details (PLAN for compactions).
+    val (archivedTimeline, archivedRollbackInfoMap) = if (showArchived) {
+      // Get the maximum instant time from active timeline
+      val maxActiveInstantTime = {
+        val lastInstantOpt = metaClient.getActiveTimeline
+          .filterCompletedInstants()
+          .lastInstant()
+        if (lastInstantOpt.isPresent) {
+          lastInstantOpt.get().requestedTime()
+        } else {
+          ""
+        }
+      }
+      // Create archived timeline starting from max active instant time
+      // This will be empty if all archived instants are older than active 
timeline
+      val timeline = if (maxActiveInstantTime.nonEmpty) {
+        metaClient.getTableFormat().getTimelineFactory()
+          .createArchivedTimeline(metaClient, maxActiveInstantTime)
+//        metaClient.getArchivedTimeline()
+      } else {
+        // If no active instants, create empty timeline
+        metaClient.getArchivedTimeline()
+      }
+      // Load the required details with appropriate LoadMode (METADATA for 
commits, PLAN for compactions)
+      // Note: loadCompletedInstantDetailsInMemory may have already loaded 
METADATA via constructor,
+      // but we call it again to ensure we have the data for the specified 
time range or limit.
+      if (startTime.nonEmpty && endTime.nonEmpty) {
+        timeline.loadCompletedInstantDetailsInMemory(startTime, endTime)
+        timeline.loadCompactionDetailsInMemory(startTime, endTime)
+      } else {
+        timeline.loadCompletedInstantDetailsInMemory(limit)
+        timeline.loadCompactionDetailsInMemory(limit)
+      }
+      val rollbackInfoMap = getRolledBackInstantInfo(timeline, metaClient)
+      (timeline, rollbackInfoMap)
+    } else {
+      (null, Map.empty[String, List[String]])
+    }
+
+    val activeEntries = getTimelineEntriesFromTimeline(
+      metaClient.getActiveTimeline, "ACTIVE", metaClient, instantInfoMap, 
activeRollbackInfoMap, limit, startTime, endTime
+    )
+
+    val finalEntries = if (showArchived) {
+      val archivedEntries = getTimelineEntriesFromTimeline(
+        archivedTimeline, "ARCHIVED", metaClient, instantInfoMap, 
archivedRollbackInfoMap, limit, startTime, endTime
+      )
+      val combinedEntries = (activeEntries ++ archivedEntries)
+        .sortWith((a, b) => {
+          val timePriorityOrder = a.getString(0).compareTo(b.getString(0))
+          if (timePriorityOrder != 0) {
+            timePriorityOrder > 0
+          } else {
+            val statePriorityOrder = Map("COMPLETED" -> 3, "INFLIGHT" -> 2, 
"REQUESTED" -> 1)
+            val state1 = a.getString(2)
+            val state2 = b.getString(2)
+            val priority1 = statePriorityOrder.getOrElse(state1, 0)
+            val priority2 = statePriorityOrder.getOrElse(state2, 0)
+            priority1 > priority2
+          }
+        })
+
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        combinedEntries
+      } else {
+        combinedEntries.take(limit)
+      }
+    } else {
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        activeEntries
+      } else {
+        activeEntries.take(limit)
+      }
+    }
+
+    finalEntries
+  }
+
+  /**
+   * Extracts timeline entries from a specific timeline (active or archived) 
and converts them to Row objects.
+   *
+   * This method processes instants from the given timeline by:
+   * 1. Filtering instants by time range using timeline's built-in filtering 
methods if `startTime` or `endTime` are provided
+   * 2. Retrieving filtered instants from the timeline
+   * 3. Sorting filtered instants by timestamp in descending order (newest 
first)
+   * 4. Applying limit if time range is not specified (when time range is 
specified, all matching entries are returned)
+   * 5. Converting each instant to a Row using `createTimelineEntry`
+   *
+   * @param timeline The timeline to extract entries from (can be active or 
archived timeline)
+   * @param timelineType The type of timeline: "ACTIVE" or "ARCHIVED". This is 
used to mark entries in the output
+   * @param metaClient The Hudi table metadata client for accessing timeline 
metadata
+   * @param instantInfoMap Map of instant timestamps to their state 
information and modification times.
+   *                      Used for formatting dates in the output. Only 
contains active timeline entries.
+   * @param rollbackInfoMap Map of instant timestamps to list of rollback 
instants that rolled them back.
+   *                       Used for populating rollback information in the 
output
+   * @param limit Maximum number of entries to return. Only applied if time 
range is not specified
+   * @param startTime Optional start timestamp for filtering (format: 
yyyyMMddHHmmss).
+   *                 If non-empty, only instants with timestamp >= startTime 
are included
+   * @param endTime Optional end timestamp for filtering (format: 
yyyyMMddHHmmss).
+   *               If non-empty, only instants with timestamp <= endTime are 
included
+   * @return Sequence of Row objects representing timeline entries from the 
specified timeline,
+   *         sorted by timestamp (descending), limited if applicable
+   *
+   * @note Time range filtering is inclusive on both ends (startTime <= 
instantTime <= endTime)
+   * @note If both `startTime` and `endTime` are provided, uses 
`findInstantsInClosedRange`
+   * @note If only `startTime` is provided, uses `findInstantsAfterOrEquals`
+   * @note If only `endTime` is provided, uses `findInstantsBeforeOrEquals`
+   * @note If both `startTime` and `endTime` are provided, the limit is 
ignored and all matching entries are returned
+   * @note If neither `startTime` nor `endTime` are provided, the limit is 
applied to the sorted results
+   *
+   * @see [[createTimelineEntry]] for the conversion of HoodieInstant to Row
+   * @see [[getTimelineEntries]] for the main method that orchestrates 
timeline entry retrieval
+   * @see [[HoodieTimeline.findInstantsInClosedRange]] for range filtering 
implementation
+   */
+  private def getTimelineEntriesFromTimeline(timeline: HoodieTimeline,
+                                             timelineType: String,
+                                             metaClient: HoodieTableMetaClient,
+                                             instantInfoMap: Map[String, 
Map[HoodieInstant.State, HoodieInstantWithModTime]],
+                                             rollbackInfoMap: Map[String, 
List[String]],
+                                             limit: Int,
+                                             startTime: String,
+                                             endTime: String): Seq[Row] = {
+
+    // Use timeline's built-in filtering methods for better performance and 
consistency
+    val filteredTimeline = {
+      val startTimeTrimmed = startTime.trim
+      val endTimeTrimmed = endTime.trim
+      if (startTimeTrimmed.nonEmpty && endTimeTrimmed.nonEmpty) {
+        // Both start and end time provided: use closed range [startTime, 
endTime]
+        timeline.findInstantsInClosedRange(startTimeTrimmed, endTimeTrimmed)
+      } else if (startTimeTrimmed.nonEmpty) {
+        // Only start time provided: get instants >= startTime
+        timeline.findInstantsAfterOrEquals(startTimeTrimmed)
+      } else if (endTimeTrimmed.nonEmpty) {
+        // Only end time provided: get instants <= endTime
+        timeline.findInstantsBeforeOrEquals(endTimeTrimmed)
+      } else {
+        // No time filtering: use original timeline
+        timeline
+      }
+    }
+
+    val instants = filteredTimeline.getInstants.iterator().asScala.toSeq
+
+    // Sort by timestamp in descending order (newest first)
+    val sortedInstants = instants.sortWith((a, b) => a.requestedTime() > 
b.requestedTime())
+
+    // Apply limit only if time range is not fully specified
+    val limitedInstants = if (startTime.trim.nonEmpty && 
endTime.trim.nonEmpty) {
+      sortedInstants
+    } else {
+      sortedInstants.take(limit)
+    }
+
+    limitedInstants.map { instant =>
+      createTimelineEntry(instant, timelineType, metaClient, instantInfoMap, 
rollbackInfoMap)
+    }
+  }
+
+  private def getRolledBackInstantInfo(timeline: HoodieTimeline, metaClient: 
HoodieTableMetaClient): Map[String, List[String]] = {
+    val rollbackInfoMap = scala.collection.mutable.Map[String, 
scala.collection.mutable.ListBuffer[String]]()
+
+    val rollbackInstants = timeline.filter(instant =>
+      
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction)).getInstants
+
+    rollbackInstants.asScala.foreach { rollbackInstant =>
+      try {
+        if (rollbackInstant.isInflight) {
+          val instantToUse = metaClient.createNewInstant(
+            HoodieInstant.State.REQUESTED, rollbackInstant.getAction, 
rollbackInstant.requestedTime())
+          val metadata = timeline.readRollbackPlan(instantToUse)
+          val rolledBackInstant = metadata.getInstantToRollback.getCommitTime
+          rollbackInfoMap.getOrElseUpdate(rolledBackInstant, 
scala.collection.mutable.ListBuffer[String]())
+            .append(rollbackInstant.requestedTime())
+        } else {
+          val metadata = timeline.readRollbackMetadata(rollbackInstant)
+          metadata.getCommitsRollback.asScala.foreach { instant =>
+            rollbackInfoMap.getOrElseUpdate(instant, 
scala.collection.mutable.ListBuffer[String]())
+              .append(rollbackInstant.requestedTime())
+          }
+        }
+      } catch {
+        case _: Exception => // Skip invalid rollback instants
+      }
+    }
+    rollbackInfoMap.map { case (k, v) => k -> v.toList }.toMap
+  }
+
+  private def getRollbackInfo(instant: HoodieInstant, timeline: 
HoodieTimeline, rollbackInfoMap: Map[String, List[String]], metaClient: 
HoodieTableMetaClient): String = {

Review Comment:
   can we add java docs for both methods. looks similar



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineProcedure.scala:
##########
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.HoodieSparkUtils
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestShowTimelineProcedure extends HoodieSparkSqlTestBase {

Review Comment:
   lets try to cover all cases listed below 
   
   
   both v1 and v2 
   
   1. limit 50. 
   2. both active and archived 20. where active contains 40. 
   3. both active and archived 20, where active contains 10. 
   4. start within active timeline. 
   5. end within active timeline. 
   6. start and end w/n active timeline. 
   7. start in archived. but arch not explicitly enabled. 
   8. "". archived enabled. 
   9. start in archived and end in active timeline. archived not enabled. 
   10.  start in archived and end in active timeline. archived enabled.
   11. start and end in archived. arch not enabled. 
   12. start and end in archived. arch enabled. 
   
   
   completed commits. 
   infight commits. 
   completed dc (mor)
   inflight dc (mor)
   clean commits. 
   clean inflight commits. 
   completed compaction. 
   infligth compaction. 
   completed replace commits. -> clustering
   inflight replace commits. -> clustering
   completed replace commits. -> insert overwrite
   inflight replace commits. -> insert overwrite
   completed rollback. 
   pending rollback. 
   
   In active timeline, we should have mix of inflight and completed. but in 
archived, we can only have completed and its fine. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to