This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f9eb91a9ef [HUDI-3503] Support more feature to call procedure
CleanCommand (#6353)
f9eb91a9ef is described below
commit f9eb91a9ef70f865c9e1496e1474c740649907dc
Author: KnightChess <[email protected]>
AuthorDate: Wed Aug 10 16:30:31 2022 +0800
[HUDI-3503] Support more feature to call procedure CleanCommand (#6353)
---
.../command/procedures/RunCleanProcedure.scala | 23 +++++----
.../sql/hudi/procedure/TestCleanProcedure.scala | 56 +++++++++++++++++++++-
2 files changed, 69 insertions(+), 10 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 2a0143bafb..36580176d0 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -21,6 +21,7 @@ import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.config.HoodieCleanConfig
+import org.apache.hudi.table.action.clean.CleaningTriggerStrategy
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
@@ -33,8 +34,12 @@ class RunCleanProcedure extends BaseProcedure with
ProcedureBuilder with Logging
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType,
false),
ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType,
true),
- ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None),
- ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10)
+ ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType,
HoodieCleanConfig.CLEANER_POLICY.defaultValue()),
+ ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType,
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue().toInt),
+ ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType,
HoodieCleanConfig.CLEANER_HOURS_RETAINED.defaultValue().toInt),
+ ProcedureParameter.optional(6, "file_versions_retained",
DataTypes.IntegerType,
HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.defaultValue().toInt),
+ ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType,
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.defaultValue()),
+ ProcedureParameter.optional(8, "trigger_max_commits",
DataTypes.IntegerType, HoodieCleanConfig.CLEAN_MAX_COMMITS.defaultValue().toInt)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -64,16 +69,16 @@ class RunCleanProcedure extends BaseProcedure with
ProcedureBuilder with Logging
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val skipLocking = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[Boolean]
val scheduleInLine = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
- val cleanPolicy = getArgValueOrDefault(args, PARAMETERS(3))
- val retainCommits = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[Integer]
val basePath = getBasePath(tableName, Option.empty)
val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime()
- var props: Map[String, String] = Map(
- HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() ->
String.valueOf(retainCommits)
+ val props: Map[String, String] = Map(
+ HoodieCleanConfig.CLEANER_POLICY.key() -> getArgValueOrDefault(args,
PARAMETERS(3)).get.toString,
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(4)).get.toString,
+ HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(5)).get.toString,
+ HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(6)).get.toString,
+ HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() ->
getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
+ HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args,
PARAMETERS(8)).get.toString
)
- if (cleanPolicy.isDefined) {
- props += (HoodieCleanConfig.CLEANER_POLICY.key() ->
String.valueOf(cleanPolicy.get))
- }
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession,
basePath, props)
val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine,
skipLocking)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
index 7986c304c7..4dd950e382 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
@@ -46,6 +46,7 @@ class TestCleanProcedure extends HoodieSparkProcedureTestBase
{
spark.sql(s"update $tableName set price = 12 where id = 1")
spark.sql(s"update $tableName set price = 13 where id = 1")
+ // KEEP_LATEST_COMMITS
val result1 = spark.sql(s"call run_clean(table => '$tableName',
retain_commits => 1)")
.collect()
.map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2),
row.getString(3), row.getString(4), row.getInt(5)))
@@ -53,6 +54,10 @@ class TestCleanProcedure extends
HoodieSparkProcedureTestBase {
assertResult(1)(result1.length)
assertResult(2)(result1(0)(2))
+ val result11 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
+ assertResult(2)(result11.length)
+
checkAnswer(s"select id, name, price, ts from $tableName order by id") (
Seq(1, "a1", 13, 1000)
)
@@ -60,7 +65,56 @@ class TestCleanProcedure extends
HoodieSparkProcedureTestBase {
val result2 = spark.sql(s"call run_clean(table => '$tableName',
retain_commits => 1)")
.collect()
assertResult(0)(result2.length)
+
+ // KEEP_LATEST_FILE_VERSIONS
+ spark.sql(s"update $tableName set price = 14 where id = 1")
+
+ val result3 = spark.sql(
+ s"call run_clean(table => '$tableName', clean_policy =>
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
+ assertResult(0)(result3.length)
+
+ val result4 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
+ assertResult(3)(result4.length)
+
+ val result5 = spark.sql(
+ s"call run_clean(table => '$tableName', clean_policy =>
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
+ assertResult(1)(result5.length)
+ assertResult(2)(result5(0)(2))
+
+ val result6 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
+ assertResult(1)(result6.length)
+
+ checkAnswer(s"select id, name, price, ts from $tableName order by id") (
+ Seq(1, "a1", 14, 1000)
+ )
+
+ // trigger time
+ spark.sql(s"update $tableName set price = 15 where id = 1")
+
+ // no trigger, only has 1 commit
+ val result7 = spark.sql(
+ s"call run_clean(table => '$tableName', trigger_max_commits => 2,
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained =>
1)").collect()
+ assertResult(0)(result7.length)
+
+ val result8 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
+ assertResult(2)(result8.length)
+
+ // trigger
+ val result9 = spark.sql(
+ s"call run_clean(table => '$tableName', trigger_max_commits => 1,
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained =>
1)").collect()
+ assertResult(1)(result9.length)
+ assertResult(1)(result9(0)(2))
+
+ val result10 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
+ assertResult(1)(result10.length)
+
+ checkAnswer(s"select id, name, price, ts from $tableName order by id") (
+ Seq(1, "a1", 15, 1000)
+ )
}
}
-
}