This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f9eb91a9ef [HUDI-3503] Support more feature to call procedure 
CleanCommand (#6353)
f9eb91a9ef is described below

commit f9eb91a9ef70f865c9e1496e1474c740649907dc
Author: KnightChess <[email protected]>
AuthorDate: Wed Aug 10 16:30:31 2022 +0800

    [HUDI-3503] Support more feature to call procedure CleanCommand (#6353)
---
 .../command/procedures/RunCleanProcedure.scala     | 23 +++++----
 .../sql/hudi/procedure/TestCleanProcedure.scala    | 56 +++++++++++++++++++++-
 2 files changed, 69 insertions(+), 10 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 2a0143bafb..36580176d0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -21,6 +21,7 @@ import org.apache.hudi.HoodieCLIUtils
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.util.JsonUtils
 import org.apache.hudi.config.HoodieCleanConfig
+import org.apache.hudi.table.action.clean.CleaningTriggerStrategy
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
@@ -33,8 +34,12 @@ class RunCleanProcedure extends BaseProcedure with 
ProcedureBuilder with Logging
     ProcedureParameter.required(0, "table", DataTypes.StringType, None),
     ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, 
false),
     ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, 
true),
-    ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None),
-    ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10)
+    ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, 
HoodieCleanConfig.CLEANER_POLICY.defaultValue()),
+    ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue().toInt),
+    ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType, 
HoodieCleanConfig.CLEANER_HOURS_RETAINED.defaultValue().toInt),
+    ProcedureParameter.optional(6, "file_versions_retained", 
DataTypes.IntegerType, 
HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.defaultValue().toInt),
+    ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType, 
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.defaultValue()),
+    ProcedureParameter.optional(8, "trigger_max_commits", 
DataTypes.IntegerType, HoodieCleanConfig.CLEAN_MAX_COMMITS.defaultValue().toInt)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -64,16 +69,16 @@ class RunCleanProcedure extends BaseProcedure with 
ProcedureBuilder with Logging
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
     val skipLocking = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Boolean]
     val scheduleInLine = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
-    val cleanPolicy = getArgValueOrDefault(args, PARAMETERS(3))
-    val retainCommits = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[Integer]
     val basePath = getBasePath(tableName, Option.empty)
     val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime()
-    var props: Map[String, String] = Map(
-      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> 
String.valueOf(retainCommits)
+    val props: Map[String, String] = Map(
+      HoodieCleanConfig.CLEANER_POLICY.key() -> getArgValueOrDefault(args, 
PARAMETERS(3)).get.toString,
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(4)).get.toString,
+      HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(5)).get.toString,
+      HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(6)).get.toString,
+      HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> 
getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
+      HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, 
PARAMETERS(8)).get.toString
     )
-    if (cleanPolicy.isDefined) {
-      props += (HoodieCleanConfig.CLEANER_POLICY.key() -> 
String.valueOf(cleanPolicy.get))
-    }
     val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, 
basePath, props)
     val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, 
skipLocking)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
index 7986c304c7..4dd950e382 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
@@ -46,6 +46,7 @@ class TestCleanProcedure extends HoodieSparkProcedureTestBase 
{
       spark.sql(s"update $tableName set price = 12 where id = 1")
       spark.sql(s"update $tableName set price = 13 where id = 1")
 
+      // KEEP_LATEST_COMMITS
       val result1 = spark.sql(s"call run_clean(table => '$tableName', 
retain_commits => 1)")
         .collect()
         .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), 
row.getString(3), row.getString(4), row.getInt(5)))
@@ -53,6 +54,10 @@ class TestCleanProcedure extends 
HoodieSparkProcedureTestBase {
       assertResult(1)(result1.length)
       assertResult(2)(result1(0)(2))
 
+      val result11 = spark.sql(
+        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
+      assertResult(2)(result11.length)
+
       checkAnswer(s"select id, name, price, ts from $tableName order by id") (
         Seq(1, "a1", 13, 1000)
       )
@@ -60,7 +65,56 @@ class TestCleanProcedure extends 
HoodieSparkProcedureTestBase {
       val result2 = spark.sql(s"call run_clean(table => '$tableName', 
retain_commits => 1)")
         .collect()
       assertResult(0)(result2.length)
+
+      // KEEP_LATEST_FILE_VERSIONS
+      spark.sql(s"update $tableName set price = 14 where id = 1")
+
+      val result3 = spark.sql(
+        s"call run_clean(table => '$tableName', clean_policy => 
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
+      assertResult(0)(result3.length)
+
+      val result4 = spark.sql(
+        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
+      assertResult(3)(result4.length)
+
+      val result5 = spark.sql(
+        s"call run_clean(table => '$tableName', clean_policy => 
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
+      assertResult(1)(result5.length)
+      assertResult(2)(result5(0)(2))
+
+      val result6 = spark.sql(
+        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
+      assertResult(1)(result6.length)
+
+      checkAnswer(s"select id, name, price, ts from $tableName order by id") (
+        Seq(1, "a1", 14, 1000)
+      )
+
+      // trigger time
+      spark.sql(s"update $tableName set price = 15 where id = 1")
+
+      // no trigger, only has 1 commit
+      val result7 = spark.sql(
+        s"call run_clean(table => '$tableName', trigger_max_commits => 2, 
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 
1)").collect()
+      assertResult(0)(result7.length)
+
+      val result8 = spark.sql(
+        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
+      assertResult(2)(result8.length)
+
+      // trigger
+      val result9 = spark.sql(
+        s"call run_clean(table => '$tableName', trigger_max_commits => 1, 
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 
1)").collect()
+      assertResult(1)(result9.length)
+      assertResult(1)(result9(0)(2))
+
+      val result10 = spark.sql(
+        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
+      assertResult(1)(result10.length)
+
+      checkAnswer(s"select id, name, price, ts from $tableName order by id") (
+        Seq(1, "a1", 15, 1000)
+      )
     }
   }
-
 }

Reply via email to