This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e208d0168fb7f9c86594fb8e3f65481fe008158a
Author: XuQianJin-Stars <[email protected]>
AuthorDate: Sun Dec 18 20:23:25 2022 +0800

    add DeleteRollbackInstantProcedure
---
 .../DeleteRollbackInstantProcedure.scala           | 73 ++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |  1 +
 .../TestDeleteRollbackInstantProcedure.scala       | 89 ++++++++++++++++++++++
 3 files changed, 163 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteRollbackInstantProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteRollbackInstantProcedure.scala
new file mode 100644
index 00000000000..95f21502d97
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteRollbackInstantProcedure.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class DeleteRollbackInstantProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.BooleanType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getBasePath(tableName)
+
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
+
+    val rollbackTimeline = metaClient.getActiveTimeline.getRollbackTimeline
+    var result = false
+    
rollbackTimeline.filterInflights().getInstants.iterator().asScala.foreach((instant:
 HoodieInstant) => {
+      logWarning("Trying to remove rollback instant file: " + instant)
+      HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, 
metaClient.getMetaPath, instant)
+    })
+    
rollbackTimeline.filterInflightsAndRequested().getInstants.iterator().asScala.foreach((instant:
 HoodieInstant) => {
+      logWarning("Trying to remove rollback instant file: " + instant)
+      HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, 
metaClient.getMetaPath, instant)
+    })
+    result = true
+    Seq(Row(result))
+  }
+
+  override def build: Procedure = new DeleteRollbackInstantProcedure()
+}
+
+object DeleteRollbackInstantProcedure {
+  val NAME = "delete_rollback_instant"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new DeleteRollbackInstantProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index a2f28167466..c4a8ce81f2d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -89,6 +89,7 @@ object HoodieProcedures {
       ,(ShowCommitExtraMetadataProcedure.NAME, 
ShowCommitExtraMetadataProcedure.builder)
       ,(ShowTablePropertiesProcedure.NAME, 
ShowTablePropertiesProcedure.builder)
       ,(HelpProcedure.NAME, HelpProcedure.builder)
+      ,(DeleteRollbackInstantProcedure.NAME, 
DeleteRollbackInstantProcedure.builder)
     )
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteRollbackInstantProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteRollbackInstantProcedure.scala
new file mode 100644
index 00000000000..35fd3b6a1c0
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteRollbackInstantProcedure.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
+import org.apache.spark.sql.catalyst.expressions.Log
+
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+class TestDeleteRollbackInstantProcedure extends HoodieSparkProcedureTestBase {
+
+  test("Test Call delete_rollback_instant Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+     """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+
+      // 3 commits are left before rollback
+      var commits = spark.sql(s"""call show_commits(table => '$tableName', 
limit => 10)""").collect()
+      assertResult(3) {
+        commits.length
+      }
+
+      // Call rollback_to_instant Procedure with Named Arguments
+      var instant_time = commits(0).get(0).toString
+      checkAnswer(s"""call rollback_to_instant(table => '$tableName', 
instant_time => '$instant_time')""")(Seq(true))
+      // Call rollback_to_instant Procedure with Positional Arguments
+      instant_time = commits(1).get(0).toString
+      checkAnswer(s"""call rollback_to_instant('$tableName', 
'$instant_time')""")(Seq(true))
+
+      // 1 commits are left after rollback
+      commits = spark.sql(s"""call show_commits(table => '$tableName', limit 
=> 10)""").collect()
+      assertResult(1) {
+        commits.length
+      }
+
+      // collect rollbacks for table
+      val rollbacks = spark.sql(s"""call show_rollbacks(table => '$tableName', 
limit => 10)""").collect()
+      assertResult(2) {
+        rollbacks.length
+      }
+
+      // make rollback instant to rollback inflight instant
+      val metaClient = 
HoodieTableMetaClient.builder.setConf(spark.sparkContext.hadoopConfiguration)
+        .setBasePath(s"${tmp.getCanonicalPath}/$tableName").build
+      val rollbackTimeline = metaClient.getActiveTimeline.getRollbackTimeline
+      rollbackTimeline.getInstants.iterator().asScala.foreach((instant: 
HoodieInstant) => {
+        HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, 
metaClient.getMetaPath, instant)
+      })
+
+      checkAnswer(s"""call delete_rollback_instant(table => 
'$tableName')""")(Seq(true))
+    }
+  }
+}

Reply via email to