This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e1a74fc55c0 [HUDI-7022] RunClusteringProcedure support limit parameter
(#9975)
e1a74fc55c0 is described below
commit e1a74fc55c00afb29e998ea64bead8bcd0bd0565
Author: ksmou <[email protected]>
AuthorDate: Fri Nov 3 16:51:30 2023 +0800
[HUDI-7022] RunClusteringProcedure support limit parameter (#9975)
---
.../command/procedures/HoodieProcedureUtils.scala | 9 +++-
.../procedures/RunClusteringProcedure.scala | 6 ++-
.../hudi/procedure/TestClusteringProcedure.scala | 55 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
index 3affe40d8f1..d1ffa82e9d3 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
@@ -99,7 +99,8 @@ object HoodieProcedureUtils {
}
}
- def filterPendingInstantsAndGetOperation(pendingInstants: Seq[String],
specificInstants: Option[String], op: Option[String]): (Seq[String], Operation)
= {
+ def filterPendingInstantsAndGetOperation(pendingInstants: Seq[String],
specificInstants: Option[String],
+ op: Option[String], limit:
Option[Int] = None): (Seq[String], Operation) = {
specificInstants match {
case Some(inst) =>
if (op.exists(o => !Execute.value.equalsIgnoreCase(o))) {
@@ -109,7 +110,11 @@ object HoodieProcedureUtils {
(HoodieProcedureUtils.checkAndFilterPendingInstants(pendingInstants,
inst), Execute)
case _ =>
// No op specified, set it as 'scheduleAndExecute' default
- (pendingInstants, op.map(o =>
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+ if (limit.isDefined) {
+ (pendingInstants.take(limit.get), op.map(o =>
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+ } else {
+ (pendingInstants, op.map(o =>
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index f0ab47c90f2..f642dfa3b3e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -56,7 +56,8 @@ class RunClusteringProcedure extends BaseProcedure
// params => key=value, key2=value2
ProcedureParameter.optional(7, "options", DataTypes.StringType),
ProcedureParameter.optional(8, "instants", DataTypes.StringType),
- ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType)
+ ProcedureParameter.optional(9, "selected_partitions",
DataTypes.StringType),
+ ProcedureParameter.optional(10, "limit", DataTypes.IntegerType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,6 +84,7 @@ class RunClusteringProcedure extends BaseProcedure
val options = getArgValueOrDefault(args, PARAMETERS(7))
val specificInstants = getArgValueOrDefault(args, PARAMETERS(8))
val parts = getArgValueOrDefault(args, PARAMETERS(9))
+ val limit = getArgValueOrDefault(args, PARAMETERS(10))
val basePath: String = getBasePath(tableName, tablePath)
val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
@@ -149,7 +151,7 @@ class RunClusteringProcedure extends BaseProcedure
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
var (filteredPendingClusteringInstants, operation) =
HoodieProcedureUtils.filterPendingInstantsAndGetOperation(
- pendingClusteringInstants,
specificInstants.asInstanceOf[Option[String]], op.asInstanceOf[Option[String]])
+ pendingClusteringInstants,
specificInstants.asInstanceOf[Option[String]], op.asInstanceOf[Option[String]],
limit.asInstanceOf[Option[Int]])
var client: SparkRDDWriteClient[_] = null
try {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index dee8304b493..b2c73332567 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -714,6 +714,61 @@ class TestClusteringProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test Call run_clustering with limit parameter") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | c1 int,
+ | c2 string,
+ | c3 double
+ |) using hudi
+ | options (
+ | primaryKey = 'c1',
+ | type = 'cow',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.datasource.write.operation = 'insert'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ val conf = new Configuration
+ val metaClient =
HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+ assert(0 ==
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+
assert(metaClient.getActiveTimeline.filterPendingReplaceTimeline().empty())
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" ->
"false"))
+ spark.sql(s"call run_clustering(table => '$tableName', op =>
'schedule')")
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" ->
"false"))
+ spark.sql(s"call run_clustering(table => '$tableName', op =>
'schedule')")
+
+ metaClient.reloadActiveTimeline()
+ assert(0 ==
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+ assert(2 ==
metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size())
+
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+ metaClient.reloadActiveTimeline()
+ assert(2 ==
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+ assert(0 ==
metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size())
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" ->
"false"))
+ spark.sql(s"call run_clustering(table => '$tableName', op =>
'schedule')")
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" ->
"false"))
+ spark.sql(s"call run_clustering(table => '$tableName', op =>
'schedule')")
+
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'execute',
limit => 1)")
+ metaClient.reloadActiveTimeline()
+ assert(3 ==
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size())
+ assert(1 ==
metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size())
+ }
+ }
+
def avgRecord(commitTimeline: HoodieTimeline): Long = {
var totalByteSize = 0L
var totalRecordsCount = 0L