This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e208d0168fb7f9c86594fb8e3f65481fe008158a Author: XuQianJin-Stars <[email protected]> AuthorDate: Sun Dec 18 20:23:25 2022 +0800 add DeleteRollbackInstantProcedure --- .../DeleteRollbackInstantProcedure.scala | 73 ++++++++++++++++++ .../hudi/command/procedures/HoodieProcedures.scala | 1 + .../TestDeleteRollbackInstantProcedure.scala | 89 ++++++++++++++++++++++ 3 files changed, 163 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteRollbackInstantProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteRollbackInstantProcedure.scala new file mode 100644 index 00000000000..95f21502d97 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteRollbackInstantProcedure.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier +import scala.collection.JavaConverters.asScalaIteratorConverter + +class DeleteRollbackInstantProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getBasePath(tableName) + + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build + + val rollbackTimeline = metaClient.getActiveTimeline.getRollbackTimeline + var result = false + rollbackTimeline.filterInflights().getInstants.iterator().asScala.foreach((instant: HoodieInstant) => { + logWarning("Trying to remove rollback instant file: " + instant) + HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, metaClient.getMetaPath, instant) + }) + rollbackTimeline.filterInflightsAndRequested().getInstants.iterator().asScala.foreach((instant: HoodieInstant) => { + logWarning("Trying to remove rollback instant file: " + instant) + HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, metaClient.getMetaPath, instant) + }) + result = true + Seq(Row(result)) + } + + override def build: Procedure = new DeleteRollbackInstantProcedure() +} + +object DeleteRollbackInstantProcedure { + val NAME = "delete_rollback_instant" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new DeleteRollbackInstantProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index a2f28167466..c4a8ce81f2d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -89,6 +89,7 @@ object HoodieProcedures { ,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder) ,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder) ,(HelpProcedure.NAME, HelpProcedure.builder) + ,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteRollbackInstantProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteRollbackInstantProcedure.scala new file mode 100644 index 00000000000..35fd3b6a1c0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteRollbackInstantProcedure.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} +import org.apache.spark.sql.catalyst.expressions.Log + +import scala.jdk.CollectionConverters.asScalaIteratorConverter + +class TestDeleteRollbackInstantProcedure extends HoodieSparkProcedureTestBase { + + test("Test Call delete_rollback_instant Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + + // 3 commits are left before rollback + var commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(3) { + commits.length + } + + // Call rollback_to_instant Procedure with Named Arguments + var instant_time = commits(0).get(0).toString + checkAnswer(s"""call rollback_to_instant(table => '$tableName', instant_time => '$instant_time')""")(Seq(true)) + // Call rollback_to_instant Procedure with Positional Arguments + instant_time = commits(1).get(0).toString + checkAnswer(s"""call rollback_to_instant('$tableName', '$instant_time')""")(Seq(true)) + + // 1 commits are left after rollback + commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(1) { + commits.length + } + + // collect rollbacks for table + val rollbacks = spark.sql(s"""call show_rollbacks(table => '$tableName', limit => 10)""").collect() + assertResult(2) { + rollbacks.length + } + + // make rollback instant to rollback inflight instant + val metaClient = HoodieTableMetaClient.builder.setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(s"${tmp.getCanonicalPath}/$tableName").build + val rollbackTimeline = metaClient.getActiveTimeline.getRollbackTimeline + rollbackTimeline.getInstants.iterator().asScala.foreach((instant: HoodieInstant) => { + HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, metaClient.getMetaPath, instant) + }) + + checkAnswer(s"""call delete_rollback_instant(table => '$tableName')""")(Seq(true)) + } + } +}
