This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 80368a049d [HUDI-3503]  Add call procedure for CleanCommand (#6065)
80368a049d is described below

commit 80368a049d0764b45ae0ce998a0e7d5897858615
Author: simonsssu <[email protected]>
AuthorDate: Sat Jul 16 22:33:26 2022 +0800

    [HUDI-3503]  Add call procedure for CleanCommand (#6065)
    
    * [HUDI-3503] Add call procedure for CleanCommand
    Co-authored-by: simonssu <[email protected]>
---
 .../hudi/command/procedures/HoodieProcedures.scala |  1 +
 .../command/procedures/RunCleanProcedure.scala     | 95 ++++++++++++++++++++++
 .../sql/hudi/procedure/TestCleanProcedure.scala    | 64 +++++++++++++++
 3 files changed, 160 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 1eb82d97c5..1a9404d265 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -80,6 +80,7 @@ object HoodieProcedures {
     mapBuilder.put(RepairDeduplicateProcedure.NAME, 
RepairDeduplicateProcedure.builder)
     mapBuilder.put(RepairMigratePartitionMetaProcedure.NAME, 
RepairMigratePartitionMetaProcedure.builder)
     mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, 
RepairOverwriteHoodiePropsProcedure.builder)
+    mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder)
     mapBuilder.build
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
new file mode 100644
index 0000000000..6e3d2e9dcb
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import java.util.function.Supplier
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.util.JsonUtils
+import org.apache.hudi.config.HoodieCleanConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with 
Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "skipLocking", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(2, "scheduleInLine", DataTypes.BooleanType, 
true),
+    ProcedureParameter.optional(3, "cleanPolicy", DataTypes.StringType, None),
+    ProcedureParameter.optional(4, "retainCommits", DataTypes.IntegerType, 10)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("start_clean_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("time_taken_in_millis", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("total_files_deleted", DataTypes.IntegerType, nullable = true, 
Metadata.empty),
+    StructField("earliest_commit_to_retain", DataTypes.StringType, nullable = 
true, Metadata.empty),
+    StructField("bootstrap_part_metadata", DataTypes.StringType, nullable = 
true, Metadata.empty),
+    StructField("version", DataTypes.IntegerType, nullable = true, 
Metadata.empty)
+  ))
+
+  override def build: Procedure = new RunCleanProcedure
+
+  /**
+   * Returns the input parameters of this procedure.
+   */
+  override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the type of rows produced by this procedure.
+   */
+  override def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val skipLocking = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Boolean]
+    val scheduleInLine = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
+    val cleanPolicy = getArgValueOrDefault(args, PARAMETERS(3))
+    val retainCommits = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[Integer]
+    val basePath = getBasePath(tableName, Option.empty)
+    val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime()
+    var props: Map[String, String] = Map(
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> 
String.valueOf(retainCommits)
+    )
+    if (cleanPolicy.isDefined) {
+      props += (HoodieCleanConfig.CLEANER_POLICY.key() -> 
String.valueOf(cleanPolicy.get))
+    }
+    val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, 
basePath, props)
+    val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, 
skipLocking)
+
+    if (hoodieCleanMeta == null) Seq(Row.empty)
+    else Seq(Row(hoodieCleanMeta.getStartCleanTime,
+      hoodieCleanMeta.getTimeTakenInMillis,
+      hoodieCleanMeta.getTotalFilesDeleted,
+      hoodieCleanMeta.getEarliestCommitToRetain,
+      
JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
+      hoodieCleanMeta.getVersion))
+  }
+}
+
+object RunCleanProcedure {
+  val NAME = "run_clean"
+
+  def builder : Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RunCleanProcedure
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
new file mode 100644
index 0000000000..e0d61cbb07
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestCleanProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test Call run_clean Procedure by Table") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+          |create table $tableName (
+          | id int,
+          | name string,
+          | price double,
+          | ts long
+          | ) using hudi
+          | location '${tmp.getCanonicalPath}'
+          | tblproperties (
+          |   primaryKey = 'id',
+          |   type = 'cow',
+          |   preCombineField = 'ts'
+          | )
+          |""".stripMargin)
+
+      spark.sql("set hoodie.parquet.max.file.size = 10000")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"update $tableName set price = 11 where id = 1")
+      spark.sql(s"update $tableName set price = 12 where id = 1")
+      spark.sql(s"update $tableName set price = 13 where id = 1")
+
+      val result1 = spark.sql(s"call run_clean(table => '$tableName', 
retainCommits => 1)")
+        .collect()
+        .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), 
row.getString(3), row.getString(4), row.getInt(5)))
+
+      assertResult(1)(result1.length)
+      assertResult(2)(result1(0)(2))
+
+      checkAnswer(s"select id, name, price, ts from $tableName order by id") (
+        Seq(1, "a1", 13, 1000)
+      )
+    }
+  }
+
+}

Reply via email to