This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5309ccc3a4 [core] Introduce PurgeFilesProcedure to purge table (#4717)
5309ccc3a4 is described below
commit 5309ccc3a432960f0babdd413130b4e64182dc88
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Dec 16 18:43:15 2024 +0800
[core] Introduce PurgeFilesProcedure to purge table (#4717)
---
docs/content/flink/procedures.md | 21 ++++
docs/content/spark/procedures.md | 10 ++
.../flink/procedure/PurgeFilesProcedure.java | 66 ++++++++++++
.../flink/procedure/PurgeFilesProcedure.java | 78 ++++++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../flink/procedure/PurgeFilesProcedureITCase.java | 48 +++++++++
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../spark/procedure/PurgeFilesProcedure.java | 112 +++++++++++++++++++++
.../spark/procedure/PurgeFilesProcedureTest.scala | 44 ++++++++
9 files changed, 382 insertions(+)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 7a9b238073..8eb1786a08 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -434,6 +434,27 @@ All available procedures are listed below.
CALL sys.rollback_to_watermark(`table` => 'default.T', watermark =>
1730292023000)
</td>
</tr>
+ <tr>
+ <td>purge_files</td>
+ <td>
+ -- for Flink 1.18<br/>
+ -- clear table with purge files directly.<br/>
+ CALL sys.purge_files('identifier')<br/><br/>
+ -- for Flink 1.19 and later<br/>
+ -- clear table with purge files directly.<br/>
+ CALL sys.purge_files(`table` => 'default.T')<br/><br/>
+ </td>
+ <td>
+ To clear table with purge files directly. Argument:
+ <li>identifier: the target table identifier. Cannot be empty.</li>
+ </td>
+ <td>
+ -- for Flink 1.18<br/>
+ CALL sys.purge_files('default.T')
+ -- for Flink 1.19 and later<br/>
+ CALL sys.purge_files(`table` => 'default.T')
+ </td>
+ </tr>
<tr>
<td>expire_snapshots</td>
<td>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 5b0efd5f90..bf7b8ae2d5 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -190,6 +190,16 @@ This section introduce all available spark procedures
about paimon.
CALL sys.rollback_to_watermark(table => 'default.T', watermark =>
1730292023000)<br/><br/>
</td>
</tr>
+ <tr>
+ <td>purge_files</td>
+ <td>
+ To clear table with purge files directly. Argument:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ </td>
+ <td>
+ CALL sys.purge_files(table => 'default.T')<br/><br/>
+ </td>
+ </tr>
<tr>
<td>migrate_database</td>
<td>
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
new file mode 100644
index 0000000000..3053eae3c7
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** A procedure to purge files for a table. */
+public class PurgeFilesProcedure extends ProcedureBase {
+ public static final String IDENTIFIER = "purge_files";
+
+ public String[] call(ProcedureContext procedureContext, String tableId)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ FileIO fileIO = fileStoreTable.fileIO();
+ Path tablePath = fileStoreTable.snapshotManager().tablePath();
+ try {
+ Arrays.stream(fileIO.listStatus(tablePath))
+ .filter(f -> !f.getPath().getName().contains("schema"))
+ .forEach(
+ fileStatus -> {
+ try {
+ fileIO.delete(fileStatus.getPath(), true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new String[] {
+ String.format("Success purge files with table: %s.",
fileStoreTable.name())
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
new file mode 100644
index 0000000000..7ee2a36104
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A procedure to purge files for a table. Usage:
+ *
+ * <pre><code>
+ * -- rollback to the snapshot which earlier or equal than watermark.
+ * CALL sys.purge_files(`table` => 'tableId')
+ * </code></pre>
+ */
+public class PurgeFilesProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "purge_files";
+
+ @ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
+ public String[] call(ProcedureContext procedureContext, String tableId)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ FileIO fileIO = fileStoreTable.fileIO();
+ Path tablePath = fileStoreTable.snapshotManager().tablePath();
+ try {
+ Arrays.stream(fileIO.listStatus(tablePath))
+ .filter(f -> !f.getPath().getName().contains("schema"))
+ .forEach(
+ fileStatus -> {
+ try {
+ fileIO.delete(fileStatus.getPath(), true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new String[] {
+ String.format("Success purge files with table: %s.",
fileStoreTable.name())
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6c3b0e7664..6251189560 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
org.apache.paimon.flink.procedure.ExpirePartitionsProcedure
+org.apache.paimon.flink.procedure.PurgeFilesProcedure
org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
new file mode 100644
index 0000000000..9eb9aad296
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link PurgeFilesProcedure}. */
+public class PurgeFilesProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testPurgeFiles() throws Exception {
+ sql(
+ "CREATE TABLE T (id INT, name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED)"
+ + " WITH ('bucket'='1')");
+
+ sql("INSERT INTO T VALUES (1, 'a')");
+ assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+ sql("INSERT INTO T VALUES (1, 'a')");
+ sql("CALL sys.purge_files(`table` => 'default.T')");
+ assertThat(sql("select * from `T`")).containsExactly();
+
+ sql("INSERT INTO T VALUES (2, 'a')");
+ assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index b2fa66a150..f5052ea25f 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -35,6 +35,7 @@ import org.apache.paimon.spark.procedure.MigrateFileProcedure;
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
+import org.apache.paimon.spark.procedure.PurgeFilesProcedure;
import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure;
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
import org.apache.paimon.spark.procedure.RenameTagProcedure;
@@ -74,6 +75,7 @@ public class SparkProcedures {
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("rollback_to_timestamp",
RollbackToTimestampProcedure::builder);
procedureBuilders.put("rollback_to_watermark",
RollbackToWatermarkProcedure::builder);
+ procedureBuilders.put("purge_files", PurgeFilesProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
new file mode 100644
index 0000000000..8a7aec6e14
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A procedure to purge files for a table. */
+public class PurgeFilesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {ProcedureParameter.required("table",
StringType)};
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", StringType, true,
Metadata.empty())
+ });
+
+ private PurgeFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ FileIO fileIO = fileStoreTable.fileIO();
+ Path tablePath =
fileStoreTable.snapshotManager().tablePath();
+ try {
+ Arrays.stream(fileIO.listStatus(tablePath))
+ .filter(f ->
!f.getPath().getName().contains("schema"))
+ .forEach(
+ fileStatus -> {
+ try {
+
fileIO.delete(fileStatus.getPath(), true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ InternalRow outputRow =
+ newInternalRow(
+ UTF8String.fromString(
+ String.format(
+ "Success purge files with
table: %s.",
+ fileStoreTable.name())));
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<PurgeFilesProcedure>() {
+ @Override
+ public PurgeFilesProcedure doBuild() {
+ return new PurgeFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "PurgeFilesProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
new file mode 100644
index 0000000000..27eafe1c3d
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class PurgeFilesProcedureTest extends PaimonSparkTestBase {
+
+ test("Paimon procedure: purge files test") {
+ spark.sql(s"""
+ |CREATE TABLE T (id STRING, name STRING)
+ |USING PAIMON
+ |""".stripMargin)
+
+ spark.sql("insert into T select '1', 'aa'");
+ checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
+
+ spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
+ checkAnswer(spark.sql("select * from test.T"), Nil)
+
+ spark.sql("refresh table test.T");
+ spark.sql("insert into T select '2', 'aa'");
+ checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
+ }
+
+}