This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2a3b0b5af8ac03d0b5940ca1d77bf9b71f9040eb Author: XuQianJin-Stars <[email protected]> AuthorDate: Mon Jan 30 12:44:47 2023 +0800 add DeleteFsFileProcedure --- .../command/procedures/DeleteFsFileProcedure.scala | 80 ++++++++++++++++++++++ .../hudi/command/procedures/HoodieProcedures.scala | 1 + .../hudi/procedure/TestDeleteFsFileProcedure.scala | 47 +++++++++++++ 3 files changed, 128 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala new file mode 100644 index 00000000000..35f43ff1733 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteFsFileProcedure.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +class DeleteFsFileProcedure extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "path", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("file", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val path = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val delFilePath: Path = new Path(path) + + val fs = FSUtils.getFs(delFilePath, jsc.hadoopConfiguration()) + val status: Array[FileStatus] = fs.globStatus(delFilePath) + val rows: java.util.List[Row] = new java.util.ArrayList[Row]() + + if (status.nonEmpty) { + for (i <- status.indices) { + var result = false + + try { + result = fs.delete(status(i).getPath, true) + } catch { + case e: Exception => System.err.println(s"delete ${status(i).getPath} failed due to", e) + } + + rows.add(Row(result, status(i).getPath.toString)) + } + } + + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new DeleteFsFileProcedure() +} + +object DeleteFsFileProcedure { + val NAME = "delete_fs_file" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new DeleteFsFileProcedure() + } +} + + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index c4a8ce81f2d..5d945ecbfdb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -90,6 +90,7 @@ object HoodieProcedures { ,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder) ,(HelpProcedure.NAME, HelpProcedure.builder) ,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder) + ,(DeleteFsFileProcedure.NAME, DeleteFsFileProcedure.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala new file mode 100644 index 00000000000..8834e21a259 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDeleteFsFileProcedure.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +class TestDeleteFsFileProcedure extends HoodieSparkProcedureTestBase { + test("Test Call delete_fs_file Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + val result = spark.sql(s"""call delete_fs_file(path => '$tablePath')""").collect() + assertResult(true) { + result.length > 0 + } + } + } +}
