This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5309ccc3a4 [core] Introduce PurgeFilesProcedure to purge table (#4717)
5309ccc3a4 is described below

commit 5309ccc3a432960f0babdd413130b4e64182dc88
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Dec 16 18:43:15 2024 +0800

    [core] Introduce PurgeFilesProcedure to purge table (#4717)
---
 docs/content/flink/procedures.md                   |  21 ++++
 docs/content/spark/procedures.md                   |  10 ++
 .../flink/procedure/PurgeFilesProcedure.java       |  66 ++++++++++++
 .../flink/procedure/PurgeFilesProcedure.java       |  78 ++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../flink/procedure/PurgeFilesProcedureITCase.java |  48 +++++++++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/procedure/PurgeFilesProcedure.java       | 112 +++++++++++++++++++++
 .../spark/procedure/PurgeFilesProcedureTest.scala  |  44 ++++++++
 9 files changed, 382 insertions(+)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 7a9b238073..8eb1786a08 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -434,6 +434,27 @@ All available procedures are listed below.
          CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 
1730292023000)
       </td>
    </tr>
+   <tr>
+          <td>purge_files</td>
+      <td>
+         -- for Flink 1.18<br/>
+         -- clear table with purge files directly.<br/>
+         CALL sys.purge_files('identifier')<br/><br/>
+         -- for Flink 1.19 and later<br/>
+         -- clear table with purge files directly.<br/>
+         CALL sys.purge_files(`table` => 'default.T')<br/><br/>
+      </td>
+      <td>
+         To clear table with purge files directly. Argument:
+            <li>identifier: the target table identifier. Cannot be empty.</li>
+      </td>
+      <td>
+         -- for Flink 1.18<br/>
+         CALL sys.purge_files('default.T')
+         -- for Flink 1.19 and later<br/>
+         CALL sys.purge_files(`table` => 'default.T')
+      </td>
+   </tr>
    <tr>
       <td>expire_snapshots</td>
       <td>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 5b0efd5f90..bf7b8ae2d5 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -190,6 +190,16 @@ This section introduce all available spark procedures 
about paimon.
           CALL sys.rollback_to_watermark(table => 'default.T', watermark => 
1730292023000)<br/><br/>
       </td>
     </tr>
+    <tr>
+      <td>purge_files</td>
+      <td>
+         To clear table with purge files directly. Argument:
+            <li>table: the target table identifier. Cannot be empty.</li>
+      </td>
+      <td>
+          CALL sys.purge_files(table => 'default.T')<br/><br/>
+      </td>
+    </tr>
     <tr>
       <td>migrate_database</td>
       <td>
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
new file mode 100644
index 0000000000..3053eae3c7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** A procedure to purge files for a table. */
+public class PurgeFilesProcedure extends ProcedureBase {
+    public static final String IDENTIFIER = "purge_files";
+
+    public String[] call(ProcedureContext procedureContext, String tableId)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        FileIO fileIO = fileStoreTable.fileIO();
+        Path tablePath = fileStoreTable.snapshotManager().tablePath();
+        try {
+            Arrays.stream(fileIO.listStatus(tablePath))
+                    .filter(f -> !f.getPath().getName().contains("schema"))
+                    .forEach(
+                            fileStatus -> {
+                                try {
+                                    fileIO.delete(fileStatus.getPath(), true);
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            });
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return new String[] {
+            String.format("Success purge files with table: %s.", 
fileStoreTable.name())
+        };
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
new file mode 100644
index 0000000000..7ee2a36104
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A procedure to purge files for a table. Usage:
+ *
+ * <pre><code>
+ *  -- rollback to the snapshot which earlier or equal than watermark.
+ *  CALL sys.purge_files(`table` => 'tableId')
+ * </code></pre>
+ */
+public class PurgeFilesProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "purge_files";
+
+    @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
+    public String[] call(ProcedureContext procedureContext, String tableId)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        FileIO fileIO = fileStoreTable.fileIO();
+        Path tablePath = fileStoreTable.snapshotManager().tablePath();
+        try {
+            Arrays.stream(fileIO.listStatus(tablePath))
+                    .filter(f -> !f.getPath().getName().contains("schema"))
+                    .forEach(
+                            fileStatus -> {
+                                try {
+                                    fileIO.delete(fileStatus.getPath(), true);
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            });
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return new String[] {
+            String.format("Success purge files with table: %s.", 
fileStoreTable.name())
+        };
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6c3b0e7664..6251189560 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
 org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
 org.apache.paimon.flink.procedure.ExpirePartitionsProcedure
+org.apache.paimon.flink.procedure.PurgeFilesProcedure
 org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
 org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
 org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
new file mode 100644
index 0000000000..9eb9aad296
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link PurgeFilesProcedure}. */
+public class PurgeFilesProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testPurgeFiles() throws Exception {
+        sql(
+                "CREATE TABLE T (id INT, name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED)"
+                        + " WITH ('bucket'='1')");
+
+        sql("INSERT INTO T VALUES (1, 'a')");
+        assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+        sql("INSERT INTO T VALUES (1, 'a')");
+        sql("CALL sys.purge_files(`table` => 'default.T')");
+        assertThat(sql("select * from `T`")).containsExactly();
+
+        sql("INSERT INTO T VALUES (2, 'a')");
+        assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index b2fa66a150..f5052ea25f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -35,6 +35,7 @@ import org.apache.paimon.spark.procedure.MigrateFileProcedure;
 import org.apache.paimon.spark.procedure.MigrateTableProcedure;
 import org.apache.paimon.spark.procedure.Procedure;
 import org.apache.paimon.spark.procedure.ProcedureBuilder;
+import org.apache.paimon.spark.procedure.PurgeFilesProcedure;
 import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure;
 import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
 import org.apache.paimon.spark.procedure.RenameTagProcedure;
@@ -74,6 +75,7 @@ public class SparkProcedures {
         procedureBuilders.put("rollback", RollbackProcedure::builder);
         procedureBuilders.put("rollback_to_timestamp", 
RollbackToTimestampProcedure::builder);
         procedureBuilders.put("rollback_to_watermark", 
RollbackToWatermarkProcedure::builder);
+        procedureBuilders.put("purge_files", PurgeFilesProcedure::builder);
         procedureBuilders.put("create_tag", CreateTagProcedure::builder);
         procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
         procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
new file mode 100644
index 0000000000..8a7aec6e14
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A procedure to purge files for a table. */
+public class PurgeFilesProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {ProcedureParameter.required("table", 
StringType)};
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", StringType, true, 
Metadata.empty())
+                    });
+
+    private PurgeFilesProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    FileStoreTable fileStoreTable = (FileStoreTable) table;
+                    FileIO fileIO = fileStoreTable.fileIO();
+                    Path tablePath = 
fileStoreTable.snapshotManager().tablePath();
+                    try {
+                        Arrays.stream(fileIO.listStatus(tablePath))
+                                .filter(f -> 
!f.getPath().getName().contains("schema"))
+                                .forEach(
+                                        fileStatus -> {
+                                            try {
+                                                
fileIO.delete(fileStatus.getPath(), true);
+                                            } catch (IOException e) {
+                                                throw new RuntimeException(e);
+                                            }
+                                        });
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    InternalRow outputRow =
+                            newInternalRow(
+                                    UTF8String.fromString(
+                                            String.format(
+                                                    "Success purge files with 
table: %s.",
+                                                    fileStoreTable.name())));
+                    return new InternalRow[] {outputRow};
+                });
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<PurgeFilesProcedure>() {
+            @Override
+            public PurgeFilesProcedure doBuild() {
+                return new PurgeFilesProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "PurgeFilesProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
new file mode 100644
index 0000000000..27eafe1c3d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class PurgeFilesProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon procedure: purge files test") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id STRING, name STRING)
+                 |USING PAIMON
+                 |""".stripMargin)
+
+    spark.sql("insert into T select '1', 'aa'");
+    checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
+
+    spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
+    checkAnswer(spark.sql("select * from test.T"), Nil)
+
+    spark.sql("refresh table test.T");
+    spark.sql("insert into T select '2', 'aa'");
+    checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
+  }
+
+}

Reply via email to