This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e1a74fc55c0 [HUDI-7022] RunClusteringProcedure support limit parameter 
(#9975)
e1a74fc55c0 is described below

commit e1a74fc55c00afb29e998ea64bead8bcd0bd0565
Author: ksmou <[email protected]>
AuthorDate: Fri Nov 3 16:51:30 2023 +0800

    [HUDI-7022] RunClusteringProcedure support limit parameter (#9975)
---
 .../command/procedures/HoodieProcedureUtils.scala  |  9 +++-
 .../procedures/RunClusteringProcedure.scala        |  6 ++-
 .../hudi/procedure/TestClusteringProcedure.scala   | 55 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
index 3affe40d8f1..d1ffa82e9d3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
@@ -99,7 +99,8 @@ object HoodieProcedureUtils {
     }
   }
 
-  def filterPendingInstantsAndGetOperation(pendingInstants: Seq[String], 
specificInstants: Option[String], op: Option[String]): (Seq[String], Operation) 
= {
+  def filterPendingInstantsAndGetOperation(pendingInstants: Seq[String], 
specificInstants: Option[String],
+                                           op: Option[String], limit: 
Option[Int] = None): (Seq[String], Operation) = {
     specificInstants match {
       case Some(inst) =>
         if (op.exists(o => !Execute.value.equalsIgnoreCase(o))) {
@@ -109,7 +110,11 @@ object HoodieProcedureUtils {
         (HoodieProcedureUtils.checkAndFilterPendingInstants(pendingInstants, 
inst), Execute)
       case _ =>
         // No op specified, set it as 'scheduleAndExecute' default
-        (pendingInstants, op.map(o => 
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+        if (limit.isDefined) {
+          (pendingInstants.take(limit.get), op.map(o => 
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+        } else {
+          (pendingInstants, op.map(o => 
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+        }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index f0ab47c90f2..f642dfa3b3e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -56,7 +56,8 @@ class RunClusteringProcedure extends BaseProcedure
     // params => key=value, key2=value2
     ProcedureParameter.optional(7, "options", DataTypes.StringType),
     ProcedureParameter.optional(8, "instants", DataTypes.StringType),
-    ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType)
+    ProcedureParameter.optional(9, "selected_partitions", 
DataTypes.StringType),
+    ProcedureParameter.optional(10, "limit", DataTypes.IntegerType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,6 +84,7 @@ class RunClusteringProcedure extends BaseProcedure
     val options = getArgValueOrDefault(args, PARAMETERS(7))
     val specificInstants = getArgValueOrDefault(args, PARAMETERS(8))
     val parts = getArgValueOrDefault(args, PARAMETERS(9))
+    val limit = getArgValueOrDefault(args, PARAMETERS(10))
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
@@ -149,7 +151,7 @@ class RunClusteringProcedure extends BaseProcedure
       .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
 
     var (filteredPendingClusteringInstants, operation) = 
HoodieProcedureUtils.filterPendingInstantsAndGetOperation(
-      pendingClusteringInstants, 
specificInstants.asInstanceOf[Option[String]], op.asInstanceOf[Option[String]])
+      pendingClusteringInstants, 
specificInstants.asInstanceOf[Option[String]], op.asInstanceOf[Option[String]], 
limit.asInstanceOf[Option[Int]])
 
     var client: SparkRDDWriteClient[_] = null
     try {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index dee8304b493..b2c73332567 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -714,6 +714,61 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call run_clustering with limit parameter") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 string,
+           |  c3 double
+           |) using hudi
+           | options (
+           |  primaryKey = 'c1',
+           |  type = 'cow',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.index.column.stats.enable = 'true',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.datasource.write.operation = 'insert'
+           | )
+           | location '$basePath'
+     """.stripMargin)
+
+      val conf = new Configuration
+      val metaClient = 
HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+      assert(0 == 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+      
assert(metaClient.getActiveTimeline.filterPendingReplaceTimeline().empty())
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> 
"false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 
'schedule')")
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> 
"false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 
'schedule')")
+
+      metaClient.reloadActiveTimeline()
+      assert(0 == 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+      assert(2 == 
metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size())
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+      metaClient.reloadActiveTimeline()
+      assert(2 == 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+      assert(0 == 
metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size())
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> 
"false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 
'schedule')")
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> 
"false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 
'schedule')")
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'execute', 
limit => 1)")
+      metaClient.reloadActiveTimeline()
+      assert(3 == 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+      assert(1 == 
metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size())
+    }
+  }
+
   def avgRecord(commitTimeline: HoodieTimeline): Long = {
     var totalByteSize = 0L
     var totalRecordsCount = 0L

Reply via email to