This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 362b340415de [HUDI-9703] Support Regex Config in 
RunClusteringProcedure (#13761)
362b340415de is described below

commit 362b340415de9ca2a1199350b3727f635294dcc6
Author: fhan <[email protected]>
AuthorDate: Tue Sep 2 10:57:00 2025 +0800

    [HUDI-9703] Support Regex Config in RunClusteringProcedure (#13761)
    
    * [HUDI-9703] Support Regex Config in RunClusteringProcedure
    
    * rename selected_partitions_reg to partition_regex_pattern
    
    * Update TestClusteringProcedure.scala
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
    Co-authored-by: YueZhang 
<[email protected]>
---
 .../procedures/RunClusteringProcedure.scala        | 11 ++++--
 .../hudi/procedure/TestClusteringProcedure.scala   | 39 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 624f34ecf5c6..df2a8c32b0eb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -59,7 +59,8 @@ class RunClusteringProcedure extends BaseProcedure
     ProcedureParameter.optional(7, "options", DataTypes.StringType),
     ProcedureParameter.optional(8, "instants", DataTypes.StringType),
     ProcedureParameter.optional(9, "selected_partitions", 
DataTypes.StringType),
-    ProcedureParameter.optional(10, "limit", DataTypes.IntegerType)
+    ProcedureParameter.optional(10, "partition_regex_pattern", 
DataTypes.StringType),
+    ProcedureParameter.optional(11, "limit", DataTypes.IntegerType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -86,7 +87,8 @@ class RunClusteringProcedure extends BaseProcedure
     val options = getArgValueOrDefault(args, PARAMETERS(7))
     val specificInstants = getArgValueOrDefault(args, PARAMETERS(8))
     val parts = getArgValueOrDefault(args, PARAMETERS(9))
-    val limit = getArgValueOrDefault(args, PARAMETERS(10))
+    val partsReg = getArgValueOrDefault(args, PARAMETERS(10))
+    val limit = getArgValueOrDefault(args, PARAMETERS(11))
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = createMetaClient(jsc, basePath)
@@ -100,6 +102,11 @@ class RunClusteringProcedure extends BaseProcedure
 
     if (selectedPartitions == null) {
       logInfo("No partition selected")
+      if (partsReg.isDefined) {
+        confs = confs ++ Map(
+          HoodieClusteringConfig.PARTITION_REGEX_PATTERN.key() -> 
partsReg.get.asInstanceOf[String]
+        )
+      }
     } else if (selectedPartitions.isEmpty) {
       logInfo("No partition matched")
       // scalastyle:off return
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 0ded61f174dd..9e3479ddd53e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -412,6 +412,45 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call run_clustering Procedure With Partition Pruning Regex") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts bigint,
+             |  sex string,
+             |  addr string
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(sex, addr)
+             | location '$basePath'
+       """.stripMargin)
+        val fs = new 
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+        // Test partition pruning with single predicate
+        var resultA: Array[Seq[Any]] = Array.empty
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 's1', 
'addr1')")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 's2', 
'addr1')")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 's1', 
'addr2')")
+        // clustering table with partition predicate
+        resultA = spark.sql(s"call run_clustering(table => '$tableName', 
partition_regex_pattern => 'sex=s1/.*', order => 'ts', show_involved_partition 
=> true)")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
+        assertResult(1)(resultA.length)
+        assertResult("sex=s1/addr=addr1,sex=s1/addr=addr2")(resultA(0)(3))
+      }
+    }
+  }
+
   test("Test Call run_clustering Procedure with specific instants") {
     withTempDir { tmp =>
       val tableName = generateTableName

Reply via email to