This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 02c1a867d8e0 [HUDI-9726] Support for generic filters for spark 
procedures (#13736)
02c1a867d8e0 is described below

commit 02c1a867d8e034b99bd01c00c5d1e34ede060e17
Author: Vamshi Krishna Kyatham 
<[email protected]>
AuthorDate: Fri Aug 22 16:57:32 2025 -0700

    [HUDI-9726] Support for generic filters for spark procedures (#13736)
---
 .../procedures/HoodieProcedureFilterUtils.scala    | 493 +++++++++++++++++++++
 .../procedures/ShowCleansPlanProcedure.scala       | 120 ++++-
 .../command/procedures/ShowCleansProcedure.scala   |  93 +++-
 .../hudi/procedure/TestShowCleansProcedures.scala  | 137 ++++++
 4 files changed, 839 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureFilterUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureFilterUtils.scala
new file mode 100644
index 000000000000..533c80b92777
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureFilterUtils.scala
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Utility object for filtering procedure results using SQL expressions.
+ *
+ * Supports all Spark SQL data types including:
+ * - Primitive types: Boolean, Byte, Short, Int, Long, Float, Double, String, 
Binary
+ * - Date/Time types: Date, Timestamp, Instant, LocalDate, LocalDateTime
+ * - Decimal types: BigDecimal with precision/scale
+ * - Complex types: Array, Map, Struct (Row)
+ * - Nested combinations of all above types
+ */
+object HoodieProcedureFilterUtils {
+
+  /**
+   * Evaluates a SQL filter expression against a sequence of rows.
+   *
+   * @param rows             The rows to filter
+   * @param filterExpression SQL expression string
+   * @param schema           The schema of the rows
+   * @param sparkSession     Spark session for expression parsing
+   * @return Filtered rows that match the expression
+   */
+  def evaluateFilter(rows: Seq[Row], filterExpression: String, schema: 
StructType, sparkSession: SparkSession): Seq[Row] = {
+
+    if (filterExpression == null || filterExpression.trim.isEmpty) {
+      rows
+    } else {
+      Try {
+        val parsedExpr = 
sparkSession.sessionState.sqlParser.parseExpression(filterExpression)
+
+        rows.filter { row =>
+          evaluateExpressionOnRow(parsedExpr, row, schema)
+        }
+      } match {
+        case Success(filteredRows) => filteredRows
+        case Failure(exception) =>
+          throw new IllegalArgumentException(
+            s"Failed to parse or evaluate filter expression 
'$filterExpression': ${exception.getMessage}",
+            exception
+          )
+      }
+    }
+  }
+
+  private def evaluateExpressionOnRow(expression: Expression, row: Row, 
schema: StructType): Boolean = {
+
+    val internalRow = convertRowToInternalRow(row, schema)
+
+    Try {
+      // First pass: bind attributes
+      val attributeBound = expression.transform {
+        case attr: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute 
=>
+          try {
+            val fieldIndex = schema.fieldIndex(attr.name)
+            val field = schema.fields(fieldIndex)
+            
org.apache.spark.sql.catalyst.expressions.BoundReference(fieldIndex, 
field.dataType, field.nullable)
+          } catch {
+            case _: IllegalArgumentException => attr
+          }
+      }
+
+      // Second pass: resolve functions
+      val functionResolved = attributeBound.transform {
+        case unresolvedFunc: 
org.apache.spark.sql.catalyst.analysis.UnresolvedFunction =>
+          unresolvedFunc.nameParts.head.toLowerCase match {
+            case "upper" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Upper(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "lower" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Lower(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "length" | "len" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Length(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "trim" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.StringTrim(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "ltrim" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.StringTrimLeft(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "rtrim" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.StringTrimRight(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "substring" | "substr" =>
+              if (unresolvedFunc.arguments.length == 3) {
+                org.apache.spark.sql.catalyst.expressions.Substring(
+                  unresolvedFunc.arguments(0),
+                  unresolvedFunc.arguments(1),
+                  unresolvedFunc.arguments(2)
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "abs" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Abs(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "round" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Round(unresolvedFunc.arguments.head, 
org.apache.spark.sql.catalyst.expressions.Literal(0))
+              } else if (unresolvedFunc.arguments.length == 2) {
+                
org.apache.spark.sql.catalyst.expressions.Round(unresolvedFunc.arguments(0), 
unresolvedFunc.arguments(1))
+              } else {
+                unresolvedFunc
+              }
+            case "ceil" | "ceiling" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Ceil(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "floor" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Floor(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "year" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Year(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "month" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Month(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "day" | "dayofmonth" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.DayOfMonth(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "hour" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Hour(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "size" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Size(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "map_keys" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.MapKeys(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "map_values" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.MapValues(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "array_contains" =>
+              if (unresolvedFunc.arguments.length == 2) {
+                org.apache.spark.sql.catalyst.expressions.ArrayContains(
+                  unresolvedFunc.arguments.head,
+                  unresolvedFunc.arguments(1)
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "array_size" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.Size(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "sort_array" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                org.apache.spark.sql.catalyst.expressions.SortArray(
+                  unresolvedFunc.arguments.head,
+                  org.apache.spark.sql.catalyst.expressions.Literal(true)
+                )
+              } else if (unresolvedFunc.arguments.length == 2) {
+                org.apache.spark.sql.catalyst.expressions.SortArray(
+                  unresolvedFunc.arguments.head,
+                  unresolvedFunc.arguments(1)
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "like" =>
+              if (unresolvedFunc.arguments.length == 2) {
+                org.apache.spark.sql.catalyst.expressions.Like(
+                  unresolvedFunc.arguments.head,
+                  unresolvedFunc.arguments(1),
+                  '\\'
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "rlike" | "regexp_like" =>
+              if (unresolvedFunc.arguments.length == 2) {
+                org.apache.spark.sql.catalyst.expressions.RLike(
+                  unresolvedFunc.arguments.head,
+                  unresolvedFunc.arguments(1)
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "regexp_extract" =>
+              if (unresolvedFunc.arguments.length == 3) {
+                org.apache.spark.sql.catalyst.expressions.RegExpExtract(
+                  unresolvedFunc.arguments.head,
+                  unresolvedFunc.arguments(1),
+                  unresolvedFunc.arguments(2)
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "date_format" =>
+              if (unresolvedFunc.arguments.length == 2) {
+                org.apache.spark.sql.catalyst.expressions.DateFormatClass(
+                  unresolvedFunc.arguments.head,
+                  unresolvedFunc.arguments(1)
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "datediff" =>
+              if (unresolvedFunc.arguments.length == 2) {
+                org.apache.spark.sql.catalyst.expressions.DateDiff(
+                  unresolvedFunc.arguments.head,
+                  unresolvedFunc.arguments(1)
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "isnull" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.IsNull(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "isnotnull" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                
org.apache.spark.sql.catalyst.expressions.IsNotNull(unresolvedFunc.arguments.head)
+              } else {
+                unresolvedFunc
+              }
+            case "coalesce" =>
+              if (unresolvedFunc.arguments.nonEmpty) {
+                
org.apache.spark.sql.catalyst.expressions.Coalesce(unresolvedFunc.arguments)
+              } else {
+                unresolvedFunc
+              }
+            case "string" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                org.apache.spark.sql.catalyst.expressions.Cast(
+                  unresolvedFunc.arguments.head,
+                  org.apache.spark.sql.types.StringType
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "int" | "integer" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                org.apache.spark.sql.catalyst.expressions.Cast(
+                  unresolvedFunc.arguments.head,
+                  org.apache.spark.sql.types.IntegerType
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "long" | "bigint" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                org.apache.spark.sql.catalyst.expressions.Cast(
+                  unresolvedFunc.arguments.head,
+                  org.apache.spark.sql.types.LongType
+                )
+              } else {
+                unresolvedFunc
+              }
+            case "double" =>
+              if (unresolvedFunc.arguments.length == 1) {
+                org.apache.spark.sql.catalyst.expressions.Cast(
+                  unresolvedFunc.arguments.head,
+                  org.apache.spark.sql.types.DoubleType
+                )
+              } else {
+                unresolvedFunc
+              }
+            case _ => unresolvedFunc
+          }
+      }
+
+      // Third pass: handle type coercion for numeric comparisons
+      val boundExpr = functionResolved.transformUp {
+        case eq: org.apache.spark.sql.catalyst.expressions.EqualTo =>
+          applyTypeCoercion(eq.left, eq.right, 
org.apache.spark.sql.catalyst.expressions.EqualTo.apply, eq)
+        case gt: org.apache.spark.sql.catalyst.expressions.GreaterThan =>
+          applyTypeCoercion(gt.left, gt.right, 
org.apache.spark.sql.catalyst.expressions.GreaterThan.apply, gt)
+        case gte: org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual 
=>
+          applyTypeCoercion(gte.left, gte.right, 
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.apply, gte)
+        case lt: org.apache.spark.sql.catalyst.expressions.LessThan =>
+          applyTypeCoercion(lt.left, lt.right, 
org.apache.spark.sql.catalyst.expressions.LessThan.apply, lt)
+        case lte: org.apache.spark.sql.catalyst.expressions.LessThanOrEqual =>
+          applyTypeCoercion(lte.left, lte.right, 
org.apache.spark.sql.catalyst.expressions.LessThanOrEqual.apply, lte)
+      }
+      val result = boundExpr.eval(internalRow)
+
+      result match {
+        case null => false
+        case boolean: Boolean => boolean
+        case other =>
+          other.toString.toLowerCase match {
+            case "true" => true
+            case "false" => false
+            case _ => false
+          }
+      }
+    } match {
+      case Success(result) => result
+      case Failure(_) => false
+    }
+  }
+
+  private def convertRowToInternalRow(row: Row, schema: StructType): 
GenericInternalRow = {
+    val values = schema.fields.zipWithIndex.map { case (field, index) =>
+      if (row.isNullAt(index)) {
+        null
+      } else {
+        convertValueToInternal(row.get(index), field.dataType)
+      }
+    }
+    new GenericInternalRow(values)
+  }
+
+  private def convertValueToInternal(value: Any, dataType: DataType): Any = {
+    import org.apache.spark.sql.types._
+
+    value match {
+      case null => null
+      case s: String => UTF8String.fromString(s)
+      case ts: java.sql.Timestamp => DateTimeUtils.fromJavaTimestamp(ts)
+      case date: java.sql.Date => DateTimeUtils.fromJavaDate(date)
+      case instant: java.time.Instant => DateTimeUtils.instantToMicros(instant)
+      case localDate: java.time.LocalDate => 
DateTimeUtils.localDateToDays(localDate)
+      case localDateTime: java.time.LocalDateTime => 
DateTimeUtils.localDateTimeToMicros(localDateTime)
+      case byte: Byte => byte
+      case short: Short => short
+      case int: Int => int
+      case long: Long => long
+      case float: Float => float
+      case double: Double => double
+      case decimal: java.math.BigDecimal =>
+        org.apache.spark.sql.types.Decimal(decimal, 
dataType.asInstanceOf[DecimalType].precision, 
dataType.asInstanceOf[DecimalType].scale)
+      case decimal: scala.math.BigDecimal =>
+        org.apache.spark.sql.types.Decimal(decimal, 
dataType.asInstanceOf[DecimalType].precision, 
dataType.asInstanceOf[DecimalType].scale)
+      case bool: Boolean => bool
+      case bytes: Array[Byte] => bytes
+      case array: Array[_] =>
+        val arrayType = dataType.asInstanceOf[ArrayType]
+        array.map(convertValueToInternal(_, arrayType.elementType))
+      case list: java.util.List[_] =>
+        val arrayType = dataType.asInstanceOf[ArrayType]
+        list.asScala.map(convertValueToInternal(_, 
arrayType.elementType)).toArray
+      case seq: Seq[_] =>
+        val arrayType = dataType.asInstanceOf[ArrayType]
+        seq.map(convertValueToInternal(_, arrayType.elementType)).toArray
+      case map: java.util.Map[_, _] =>
+        val mapType = dataType.asInstanceOf[MapType]
+        val convertedKeys = map.asScala.keys.map(convertValueToInternal(_, 
mapType.keyType)).toArray
+        val convertedValues = map.asScala.values.map(convertValueToInternal(_, 
mapType.valueType)).toArray
+        org.apache.spark.sql.catalyst.util.ArrayBasedMapData(convertedKeys, 
convertedValues)
+      case map: scala.collection.Map[_, _] =>
+        val mapType = dataType.asInstanceOf[MapType]
+        val convertedKeys = map.keys.map(convertValueToInternal(_, 
mapType.keyType)).toArray
+        val convertedValues = map.values.map(convertValueToInternal(_, 
mapType.valueType)).toArray
+        org.apache.spark.sql.catalyst.util.ArrayBasedMapData(convertedKeys, 
convertedValues)
+      case row: org.apache.spark.sql.Row =>
+        val structType = dataType.asInstanceOf[StructType]
+        val values = structType.fields.zipWithIndex.map { case (field, index) 
=>
+          if (row.isNullAt(index)) {
+            null
+          } else {
+            convertValueToInternal(row.get(index), field.dataType)
+          }
+        }
+        new GenericInternalRow(values)
+      case utf8: UTF8String => utf8
+      case internalRow: org.apache.spark.sql.catalyst.InternalRow => 
internalRow
+      case mapData: org.apache.spark.sql.catalyst.util.MapData => mapData
+      case arrayData: org.apache.spark.sql.catalyst.util.ArrayData => arrayData
+      case decimal: org.apache.spark.sql.types.Decimal => decimal
+      case uuid: java.util.UUID => UTF8String.fromString(uuid.toString)
+      case other => other
+    }
+  }
+
+  def validateFilterExpression(filterExpression: String, schema: StructType, 
sparkSession: SparkSession): Either[String, Unit] = {
+
+    if (filterExpression == null || filterExpression.trim.isEmpty) {
+      Right(())
+    } else {
+      Try {
+        val parsedExpr = 
sparkSession.sessionState.sqlParser.parseExpression(filterExpression)
+        val columnNames = schema.fieldNames.toSet
+        val referencedColumns = extractColumnReferences(parsedExpr)
+        val invalidColumns = referencedColumns -- columnNames
+
+        if (invalidColumns.nonEmpty) {
+          Left(s"Invalid column references: ${invalidColumns.mkString(", ")}. 
Available columns: ${columnNames.mkString(", ")}")
+        } else {
+          Right(())
+        }
+      } match {
+        case Success(result) => result
+        case Failure(exception) => Left(s"Invalid filter expression: 
${exception.getMessage}")
+      }
+    }
+  }
+
+  private def extractColumnReferences(expression: Expression): Set[String] = {
+    import org.apache.spark.sql.catalyst.expressions._
+
+    expression match {
+      case attr: AttributeReference => Set(attr.name)
+      case unresolved: UnresolvedAttribute => Set(unresolved.name)
+      case _ => expression.children.flatMap(extractColumnReferences).toSet
+    }
+  }
+
+  private def applyTypeCoercion[T <: 
org.apache.spark.sql.catalyst.expressions.Expression](
+                                                                               
             left: org.apache.spark.sql.catalyst.expressions.Expression,
+                                                                               
             right: org.apache.spark.sql.catalyst.expressions.Expression,
+                                                                               
             constructor: 
(org.apache.spark.sql.catalyst.expressions.Expression, 
org.apache.spark.sql.catalyst.expressions.Expression) => T,
+                                                                               
             original: T): T = {
+    (left, right) match {
+      case (boundRef: 
org.apache.spark.sql.catalyst.expressions.BoundReference, literal: 
org.apache.spark.sql.catalyst.expressions.Literal)
+        if boundRef.dataType == org.apache.spark.sql.types.LongType && 
literal.dataType == org.apache.spark.sql.types.IntegerType =>
+        val castExpr = 
org.apache.spark.sql.catalyst.expressions.Cast(boundRef, 
org.apache.spark.sql.types.IntegerType)
+        constructor(castExpr, literal)
+      case _ => original
+    }
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
index fc0ef22b1f57..9b599a2bc5cc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
@@ -31,6 +31,106 @@ import java.util.function.Supplier
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
+/**
+ * Spark SQL procedure to show clean plans for a Hudi table.
+ *
+ * This procedure displays information about clean operations that have been 
planned but not yet executed.
+ * Clean plans contain metadata about which files are scheduled for deletion 
during the next clean operation.
+ *
+ * == Parameters ==
+ * - `table`: Required. The name of the Hudi table to query
+ * - `limit`: Optional. Maximum number of clean plans to return (default: 10)
+ * - `showArchived`: Optional. Whether to include archived clean plans 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ *
+ * == Output Schema ==
+ * - `plan_time`: Timestamp when the clean plan was created
+ * - `state`: Current state of the clean plan (REQUESTED, INFLIGHT, COMPLETED)
+ * - `action`: The action type (always 'clean')
+ * - `earliest_instant_to_retain`: The earliest commit that will be retained 
after cleaning
+ * - `last_completed_commit_timestamp`: The last completed commit at the time 
of planning
+ * - `policy`: The clean policy used (e.g., KEEP_LATEST_COMMITS, 
KEEP_LATEST_FILE_VERSIONS)
+ * - `version`: Version of the clean plan metadata format
+ * - `total_partitions_to_clean`: Number of partitions that have files to be 
cleaned
+ * - `total_partitions_to_delete`: Number of partitions that will have files 
deleted
+ * - `extra_metadata`: Additional metadata associated with the clean plan
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no clean plans match the criteria
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions that can be applied to any 
output column.
+ * The filter uses Spark SQL syntax and supports various data types and 
operations.
+ *
+ * === Filter Examples ===
+ * {{{
+ * -- Show clean plans created after a specific timestamp
+ * CALL show_clean_plans(
+ *   table => 'my_table',
+ *   filter => "plan_time > '20241201000000'"
+ * )
+ *
+ * -- Show clean plans that will clean many partitions
+ * CALL show_clean_plans(
+ *   table => 'my_table',
+ *   filter => "total_partitions_to_clean > 10"
+ * )
+ *
+ * -- Show recent clean plans with complex conditions
+ * CALL show_clean_plans(
+ *   table => 'my_table',
+ *   filter => "plan_time > '20241201000000' AND total_partitions_to_delete 
BETWEEN 1 AND 100"
+ * )
+ *
+ * -- Show clean plans using string functions and policy filters
+ * CALL show_clean_plans(
+ *   table => 'my_table',
+ *   filter => "LENGTH(earliest_instant_to_retain) > 10 AND policy = 
'KEEP_LATEST_COMMITS'"
+ * )
+ *
+ * -- Show clean plans with null checks and state filtering
+ * CALL show_clean_plans(
+ *   table => 'my_table',
+ *   filter => "last_completed_commit_timestamp IS NOT NULL AND state = 
'COMPLETED'"
+ * )
+ *
+ * -- Show clean plans using IN operator for states
+ * CALL show_clean_plans(
+ *   table => 'my_table',
+ *   filter => "state IN ('REQUESTED', 'INFLIGHT') AND 
total_partitions_to_clean > 0"
+ * )
+ * }}}
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show last 10 clean plans
+ * CALL show_clean_plans(table => 'hudi_table_2')
+ *
+ * -- Show more results with custom limit
+ * CALL show_clean_plans(table => 'hudi_table_2', limit => 50)
+ *
+ * -- Include archived clean plans
+ * CALL show_clean_plans(table => 'hudi_table_2', showArchived => true)
+ *
+ * -- Filter for recent clean plans
+ * CALL show_clean_plans(
+ *   table => 'hudi_table_2',
+ *   filter => "plan_time > '20241201000000'"
+ * )
+ *
+ * -- Show clean plans that will clean many partitions
+ * CALL show_clean_plans(
+ *   table => 'hudi_table_2',
+ *   filter => "total_partitions_to_clean > 5",
+ *   limit => 20
+ * )
+ * }}}
+ *
+ * @see [[ShowCleansProcedure]] for information about completed clean 
operations
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
 class ShowCleansPlanProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
 
   import ShowCleansPlanProcedure._
@@ -45,10 +145,19 @@ class ShowCleansPlanProcedure extends BaseProcedure with 
ProcedureBuilder with S
     val tableName = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String]
     val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
     val showArchived = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[String]
 
     validateInputs(tableName, limit)
 
-    Try {
+    if (filter != null && filter.trim.nonEmpty) {
+      HoodieProcedureFilterUtils.validateFilterExpression(filter, outputType, 
sparkSession) match {
+        case Left(errorMessage) =>
+          throw new IllegalArgumentException(s"Invalid filter expression: 
$errorMessage")
+        case Right(_) => // Validation passed, continue
+      }
+    }
+
+    val rows = Try {
       val hoodieCatalogTable = 
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, tableName)
       val metaClient = createMetaClient(jsc, hoodieCatalogTable.tableLocation)
       getCleanerPlans(metaClient, limit, showArchived)
@@ -59,6 +168,12 @@ class ShowCleansPlanProcedure extends BaseProcedure with 
ProcedureBuilder with S
         logError(errorMsg, exception)
         throw new HoodieException(s"$errorMsg: ${exception.getMessage}", 
exception)
     }
+
+    if (filter != null && filter.trim.nonEmpty) {
+      HoodieProcedureFilterUtils.evaluateFilter(rows, filter, outputType, 
sparkSession)
+    } else {
+      rows
+    }
   }
 
   override def build: Procedure = new ShowCleansPlanProcedure()
@@ -184,7 +299,8 @@ object ShowCleansPlanProcedure {
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType, 
false)
+    ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(3, "filter", DataTypes.StringType, "")
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
index cd918c832e3e..1e560f7ecd5d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
@@ -30,12 +30,88 @@ import java.util.function.Supplier
 
 import scala.collection.JavaConverters._
 
+/**
+ * Spark SQL procedure to show completed clean operations for a Hudi table.
+ *
+ * This procedure displays information about clean operations that have been 
executed.
+ * Clean operations remove old file versions to reclaim storage space and 
maintain table performance.
+ *
+ * == Parameters ==
+ * - `table`: Required. The name of the Hudi table to query
+ * - `limit`: Optional. Maximum number of clean operations to return (default: 
10)
+ * - `showArchived`: Optional. Whether to include archived clean operations 
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty 
string)
+ *
+ * == Output Schema ==
+ * - `clean_time`: Timestamp when the clean operation was performed
+ * - `state_transition_time`: Time when the clean transitioned to completed 
state
+ * - `action`: The action type (always 'clean')
+ * - `start_clean_time`: When the clean operation started
+ * - `time_taken_in_millis`: Duration of the clean operation in milliseconds
+ * - `total_files_deleted`: Total number of files deleted during the clean
+ * - `earliest_commit_to_retain`: The earliest commit that was retained
+ * - `last_completed_commit_timestamp`: The last completed commit at clean time
+ * - `version`: Version of the clean operation metadata
+ * - Additional partition-level metadata columns when using 
`show_cleans_metadata`
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no clean plans match the criteria
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results.
+ *
+ * === Common Filter Examples ===
+ * {{{
+ * -- Show cleans that deleted many files
+ * CALL show_cleans(
+ *   table => 'my_table',
+ *   filter => "total_files_deleted > 100"
+ * )
+ *
+ * -- Show recent clean operations
+ * CALL show_cleans(
+ *   table => 'my_table',
+ *   filter => "clean_time > '20231201000000'"
+ * )
+ *
+ * -- Show slow clean operations
+ * CALL show_cleans(
+ *   table => 'my_table',
+ *   filter => "time_taken_in_millis > 60000"
+ * )
+ * }}}
+ *
+ * == Some Usage Examples ==
+ * {{{
+ * -- Basic usage: Show last 10 completed cleans
+ * CALL show_cleans(table => 'hudi_table_1')
+ *
+ * -- Show clean operations with partition metadata
+ * CALL show_cleans_metadata(table => 'hudi_table_1')
+ *
+ * -- Include archived clean operations
+ * CALL show_cleans(table => 'hudi_table_1', showArchived => true)
+ *
+ * -- Filter for recent efficient cleans
+ * CALL show_cleans(
+ *   table => 'hudi_table_1',
+ *   filter => "clean_time > '20231201000000' AND total_files_deleted > 0"
+ * )
+ * }}}
+ *
+ * @param includePartitionMetadata Whether to include partition-level metadata 
in output
+ * @see [[ShowCleansPlanProcedure]] for information about planned clean 
operations
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
 class ShowCleansProcedure(includePartitionMetadata: Boolean) extends 
BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
 
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType, 
false)
+    ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(3, "filter", DataTypes.StringType, "")
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -75,6 +151,15 @@ class ShowCleansProcedure(includePartitionMetadata: 
Boolean) extends BaseProcedu
     val table = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String]
     val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
     val showArchived = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[String]
+
+    if (filter != null && filter.trim.nonEmpty) {
+      HoodieProcedureFilterUtils.validateFilterExpression(filter, outputType, 
sparkSession) match {
+        case Left(errorMessage) =>
+          throw new IllegalArgumentException(s"Invalid filter expression: 
$errorMessage")
+        case Right(_) => // Validation passed, continue
+      }
+    }
 
     val hoodieCatalogTable = 
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
     val basePath = hoodieCatalogTable.tableLocation
@@ -97,7 +182,11 @@ class ShowCleansProcedure(includePartitionMetadata: 
Boolean) extends BaseProcedu
     } else {
       activeResults
     }
-    finalResults
+    if (filter != null && filter.trim.nonEmpty) {
+      HoodieProcedureFilterUtils.evaluateFilter(finalResults, filter, 
outputType, sparkSession)
+    } else {
+      finalResults
+    }
   }
 
   override def build: Procedure = new 
ShowCleansProcedure(includePartitionMetadata)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
index 0b97b41e6dd4..6030b931b0a6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
@@ -84,6 +84,13 @@ class TestShowCleansProcedures extends 
HoodieSparkProcedureTestBase {
           val planTime = plan.getString(0)
           assert(planTime.nonEmpty && planTime.toLong > 0, "Plan time should 
be a valid timestamp")
         }
+        val sortedPlans = secondCleanPlans.sortBy(_.getString(0))
+        val actualFirstCleanTime = sortedPlans(0).getString(0)
+        val startTimeStr = (actualFirstCleanTime.toLong + 1000).toString
+        val afterStartFilter = spark.sql(s"""call show_clean_plans(table => 
'$tableName', filter => "plan_time > '$startTimeStr'")""")
+        afterStartFilter.show(false)
+        val afterStartRows = afterStartFilter.collect()
+        assertResult(afterStartRows.length)(1)
       }
     }
   }
@@ -374,4 +381,134 @@ class TestShowCleansProcedures extends 
HoodieSparkProcedureTestBase {
       spark.sql(s"call show_cleans_metadata(table => 
'$nonExistentTable')").collect()
     }
   }
+
+  test("Test cleaning with some complex filters") {
+    withSQLConf("hoodie.clean.automatic" -> "false", 
"hoodie.parquet.max.file.size" -> "10000") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             | ) using hudi
+             | location '${tmp.getCanonicalPath}'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'cow',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+        spark.sql(s"update $tableName set price = 11 where id = 1")
+
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)").collect()
+
+        spark.sql(s"update $tableName set price = 12 where id = 1")
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)").collect()
+
+        spark.sql(s"update $tableName set price = 13 where id = 1")
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)").collect()
+
+        val allCleans = spark.sql(s"call show_cleans(table => '$tableName')")
+        allCleans.show(false)
+        val allCleansDf = allCleans.collect()
+        val firstCleanTime = if (allCleansDf.nonEmpty) 
allCleansDf.last.getAs[String]("clean_time") else "0"
+
+        val firstCleanDF = spark.sql(
+          s"""call show_cleans(table => '$tableName', filter => "clean_time = 
'$firstCleanTime' AND action = 'clean'")"""
+        )
+        firstCleanDF.show(false)
+        val firstClean = firstCleanDF.collect()
+
+        val laterCleansDF = spark.sql(
+          s"""call show_cleans(table => '$tableName', filter => "clean_time > 
'$firstCleanTime' AND action = 'clean'")"""
+        )
+        laterCleansDF.show(false)
+        val laterCleans = laterCleansDF.collect()
+
+        val numericFilterDF = spark.sql(
+          s"""call show_cleans(table => '$tableName', filter => 
"total_files_deleted > 0 AND LENGTH(action) > 3")"""
+        )
+        numericFilterDF.show(false)
+        val numericFilter = numericFilterDF.collect()
+
+        assert(firstClean.length == 1, "First clean filter should execute 
successfully")
+        assert(laterCleans.length == allCleansDf.length - 1, "Later cleans 
filter should execute successfully")
+        assert(numericFilter.length == allCleansDf.length, "Numeric filter 
should execute successfully")
+      }
+    }
+  }
+
+  test("Test filter expressions with various data types") {
+    withSQLConf("hoodie.clean.automatic" -> "false", 
"hoodie.parquet.max.file.size" -> "10000") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | active boolean,
+             | ts long
+             | ) using hudi
+             | location '${tmp.getCanonicalPath}'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'cow',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'product1', 99.99, true, 
1000)")
+        spark.sql(s"insert into $tableName values(2, 'product2', 149.99, 
false, 2000)")
+
+        spark.sql(s"update $tableName set price = 109.99 where id = 1")
+        spark.sql(s"update $tableName set price = 119.99 where id = 1")
+        spark.sql(s"update $tableName set price = 129.99 where id = 2")
+        spark.sql(s"update $tableName set price = 139.99 where id = 2")
+
+        spark.sql(s"insert into $tableName values(3, 'product3', 199.99, true, 
3000)")
+        spark.sql(s"update $tableName set price = 149.99 where id = 1")
+
+        spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
2)").collect()
+
+        val allCleansDF = spark.sql(s"call show_cleans(table => '$tableName', 
showArchived => true)")
+        allCleansDF.show(false)
+
+        val filterTests = Seq(
+          ("action = 'clean'", "String equality"),
+          ("action LIKE 'clean%'", "String LIKE pattern"),
+          ("UPPER(action) = 'CLEAN'", "String function with equality"),
+          ("LENGTH(clean_time) > 5", "String length function"),
+          ("total_files_deleted >= 0", "Numeric comparison"),
+          ("time_taken_in_millis BETWEEN 0 AND 999999", "Numeric BETWEEN"),
+          ("clean_time IS NOT NULL", "NULL check"),
+          ("action = 'clean' AND total_files_deleted >= 0", "AND logic"),
+          ("total_files_deleted >= 0 OR time_taken_in_millis >= 0", "OR 
logic"),
+          ("NOT (total_files_deleted < 0)", "NOT logic"),
+          ("action IN ('clean', 'commit', 'rollback')", "IN operator")
+        )
+
+        filterTests.foreach { case (filterExpr, description) =>
+          val filteredResult = spark.sql(
+            s"""call show_cleans(table => '$tableName',
+               |filter => "$filterExpr")""".stripMargin
+          ).collect()
+          assert(filteredResult.length > 0, s"Filter '$description' should 
execute successfully")
+        }
+      }
+    }
+  }
 }


Reply via email to