This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 362b340415de [HUDI-9703] Support Regex Config in
RunClusteringProcedure (#13761)
362b340415de is described below
commit 362b340415de9ca2a1199350b3727f635294dcc6
Author: fhan <[email protected]>
AuthorDate: Tue Sep 2 10:57:00 2025 +0800
[HUDI-9703] Support Regex Config in RunClusteringProcedure (#13761)
* [HUDI-9703] Support Regex Config in RunClusteringProcedure
* rename selected_partitions_reg to partition_regex_pattern
* Update TestClusteringProcedure.scala
---------
Co-authored-by: fhan <[email protected]>
Co-authored-by: YueZhang
<[email protected]>
---
.../procedures/RunClusteringProcedure.scala | 11 ++++--
.../hudi/procedure/TestClusteringProcedure.scala | 39 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 624f34ecf5c6..df2a8c32b0eb 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -59,7 +59,8 @@ class RunClusteringProcedure extends BaseProcedure
ProcedureParameter.optional(7, "options", DataTypes.StringType),
ProcedureParameter.optional(8, "instants", DataTypes.StringType),
ProcedureParameter.optional(9, "selected_partitions",
DataTypes.StringType),
- ProcedureParameter.optional(10, "limit", DataTypes.IntegerType)
+ ProcedureParameter.optional(10, "partition_regex_pattern",
DataTypes.StringType),
+ ProcedureParameter.optional(11, "limit", DataTypes.IntegerType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -86,7 +87,8 @@ class RunClusteringProcedure extends BaseProcedure
val options = getArgValueOrDefault(args, PARAMETERS(7))
val specificInstants = getArgValueOrDefault(args, PARAMETERS(8))
val parts = getArgValueOrDefault(args, PARAMETERS(9))
- val limit = getArgValueOrDefault(args, PARAMETERS(10))
+ val partsReg = getArgValueOrDefault(args, PARAMETERS(10))
+ val limit = getArgValueOrDefault(args, PARAMETERS(11))
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = createMetaClient(jsc, basePath)
@@ -100,6 +102,11 @@ class RunClusteringProcedure extends BaseProcedure
if (selectedPartitions == null) {
logInfo("No partition selected")
+ if (partsReg.isDefined) {
+ confs = confs ++ Map(
+ HoodieClusteringConfig.PARTITION_REGEX_PATTERN.key() ->
partsReg.get.asInstanceOf[String]
+ )
+ }
} else if (selectedPartitions.isEmpty) {
logInfo("No partition matched")
// scalastyle:off return
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 0ded61f174dd..9e3479ddd53e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -412,6 +412,45 @@ class TestClusteringProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test Call run_clustering Procedure With Partition Pruning Regex") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts bigint,
+ | sex string,
+ | addr string
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(sex, addr)
+ | location '$basePath'
+ """.stripMargin)
+ val fs = new
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+ // Test partition pruning with single predicate
+ var resultA: Array[Seq[Any]] = Array.empty
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 's1',
'addr1')")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 's2',
'addr1')")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 's1',
'addr2')")
+ // clustering table with partition predicate
+ resultA = spark.sql(s"call run_clustering(table => '$tableName',
partition_regex_pattern => 'sex=s1/.*', order => 'ts', show_involved_partition
=> true)")
+ .collect()
+ .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2),
row.getString(3)))
+ assertResult(1)(resultA.length)
+ assertResult("sex=s1/addr=addr1,sex=s1/addr=addr2")(resultA(0)(3))
+ }
+ }
+ }
+
test("Test Call run_clustering Procedure with specific instants") {
withTempDir { tmp =>
val tableName = generateTableName