This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4362b79429 [core] Purge files use Table API and no dry run (#5448)
4362b79429 is described below

commit 4362b79429142b0777d162a9d54d4fe1300f4a6c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 11 14:57:43 2025 +0800

    [core] Purge files use Table API and no dry run (#5448)
---
 docs/content/flink/procedures.md                   | 18 +-------
 docs/content/spark/procedures.md                   |  4 +-
 .../paimon/table/AbstractFileStoreTable.java       |  5 +++
 .../java/org/apache/paimon/table/DataTable.java    |  3 ++
 .../paimon/table/DelegatedFileStoreTable.java      |  6 +++
 .../org/apache/paimon/table/FileStoreTable.java    | 47 ++++++++++++++++++++
 .../apache/paimon/table/system/AuditLogTable.java  |  5 +++
 .../paimon/table/system/CompactBucketsTable.java   |  6 +++
 .../paimon/table/system/FileMonitorTable.java      |  6 +++
 .../paimon/table/system/ReadOptimizedTable.java    |  6 +++
 .../flink/procedure/PurgeFilesProcedure.java       | 47 ++------------------
 .../flink/procedure/PurgeFilesProcedure.java       | 50 +++-------------------
 .../flink/procedure/PurgeFilesProcedureITCase.java | 38 +---------------
 .../spark/procedure/PurgeFilesProcedure.java       | 49 +++------------------
 .../spark/procedure/PurgeFilesProcedureTest.scala  | 33 +-------------
 15 files changed, 105 insertions(+), 218 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 26cf30c1df..ffacc3d717 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -486,29 +486,15 @@ All available procedures are listed below.
    <tr>
           <td>purge_files</td>
       <td>
-         -- for Flink 1.18<br/>
-         -- clear table with purge files directly.<br/>
+         -- clear table with purge files.<br/>
          CALL [catalog.]sys.purge_files('identifier')<br/>
-         -- only check what dirs will be deleted, but not really delete 
them.<br/>
-         CALL [catalog.]sys.purge_files('identifier', true)<br/><br/>
-         -- for Flink 1.19 and later<br/>
-         -- clear table with purge files directly.<br/>
-         CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/>
-         -- only check what dirs will be deleted, but not really delete 
them.<br/>
-         CALL [catalog.]sys.purge_files(`table` => 'default.T', `dry_run` => 
true)<br/><br/>
       </td>
       <td>
-         To clear table with purge files directly. Argument:
+         To clear table with purge files. Argument:
             <li>table: the target table identifier. Cannot be empty.</li>
-            <li>dry_run (optional): only check what dirs will be deleted, but 
not really delete them. Default is false.</li>
       </td>
       <td>
-         -- for Flink 1.18<br/>
          CALL sys.purge_files('default.T')<br/>
-         CALL sys.purge_files('default.T', true)<br/><br/>
-         -- for Flink 1.19 and later<br/>
-         CALL sys.purge_files(`table` => 'default.T')<br/>
-         CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)
       </td>
    </tr>
    <tr>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 07afd315e3..be667e67f2 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -198,13 +198,11 @@ This section introduce all available spark procedures 
about paimon.
     <tr>
       <td>purge_files</td>
       <td>
-         To clear table with purge files directly. Argument:
+         To clear table with purge files. Argument:
             <li>table: the target table identifier. Cannot be empty.</li>
-            <li>dry_run (optional): only check what dirs will be deleted, but 
not really delete them. Default is false.</li>
       </td>
       <td>
           CALL sys.purge_files(table => 'default.T')<br/>
-          CALL sys.purge_files(table => 'default.T', dry_run => true)
       </td>
     </tr>
     <tr>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9793a6a52a..f68bd9615e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -452,6 +452,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 options.forceCreatingSnapshot());
     }
 
+    @Override
+    public ConsumerManager consumerManager() {
+        return new ConsumerManager(fileIO, path, snapshotManager().branch());
+    }
+
     @Nullable
     protected Runnable newExpireRunnable() {
         CoreOptions options = coreOptions();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index dd0ab68254..dd39a1610b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.source.DataTableScan;
@@ -42,6 +43,8 @@ public interface DataTable extends InnerTable {
 
     ChangelogManager changelogManager();
 
+    ConsumerManager consumerManager();
+
     SchemaManager schemaManager();
 
     TagManager tagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 0e0967fe89..dc4aeec073 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -105,6 +106,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         return wrapped.schemaManager();
     }
 
+    @Override
+    public ConsumerManager consumerManager() {
+        return wrapped.consumerManager();
+    }
+
     @Override
     public TagManager tagManager() {
         return wrapped.tagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 61aa77d5f3..92b0fc7b6a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -20,20 +20,28 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.operation.LocalOrphanFilesClean;
+import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.SegmentsCache;
+import org.apache.paimon.utils.TagManager;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -120,4 +128,43 @@ public interface FileStoreTable extends DataTable {
      */
     @Override
     FileStoreTable switchToBranch(String branchName);
+
+    /** Purge all files in this table. */
+    default void purgeFiles() throws Exception {
+        // clear branches
+        BranchManager branchManager = branchManager();
+        branchManager.branches().forEach(branchManager::dropBranch);
+
+        // clear tags
+        TagManager tagManager = tagManager();
+        tagManager.allTagNames().forEach(this::deleteTag);
+
+        // clear consumers
+        ConsumerManager consumerManager = this.consumerManager();
+        
consumerManager.consumers().keySet().forEach(consumerManager::deleteConsumer);
+
+        // truncate table
+        try (BatchTableCommit commit = 
this.newBatchWriteBuilder().newCommit()) {
+            commit.truncateTable();
+        }
+
+        // clear changelogs
+        ChangelogManager changelogManager = this.changelogManager();
+        this.fileIO().delete(changelogManager.changelogDirectory(), true);
+
+        // clear snapshots, keep only latest snapshot
+        this.newExpireSnapshots()
+                .config(
+                        ExpireConfig.builder()
+                                .snapshotMaxDeletes(Integer.MAX_VALUE)
+                                .snapshotRetainMax(1)
+                                .snapshotRetainMin(1)
+                                .snapshotTimeRetain(Duration.ZERO)
+                                .build())
+                .expire();
+
+        // clear orphan files
+        LocalOrphanFilesClean clean = new LocalOrphanFilesClean(this, 
System.currentTimeMillis());
+        clean.clean();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 28b10bddd2..eabb1a6120 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -193,6 +193,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         return wrapped.changelogManager();
     }
 
+    @Override
+    public ConsumerManager consumerManager() {
+        return wrapped.consumerManager();
+    }
+
     @Override
     public SchemaManager schemaManager() {
         return wrapped.schemaManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
index de6b29dee4..8ffae30bd7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.system;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -152,6 +153,11 @@ public class CompactBucketsTable implements DataTable, 
ReadonlyTable {
         return wrapped.changelogManager();
     }
 
+    @Override
+    public ConsumerManager consumerManager() {
+        return wrapped.consumerManager();
+    }
+
     @Override
     public SchemaManager schemaManager() {
         return wrapped.schemaManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index f3fafaa91e..060570406b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -138,6 +139,11 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
         return wrapped.changelogManager();
     }
 
+    @Override
+    public ConsumerManager consumerManager() {
+        return wrapped.consumerManager();
+    }
+
     @Override
     public SchemaManager schemaManager() {
         return wrapped.schemaManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 1e252fc088..639b711c29 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.system;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -173,6 +174,11 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
         return wrapped.changelogManager();
     }
 
+    @Override
+    public ConsumerManager consumerManager() {
+        return wrapped.consumerManager();
+    }
+
     @Override
     public SchemaManager schemaManager() {
         return wrapped.schemaManager();
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 2901fd1f59..a968d441d6 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -18,58 +18,19 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
 /** A procedure to purge files for a table. */
 public class PurgeFilesProcedure extends ProcedureBase {
-    public static final String IDENTIFIER = "purge_files";
 
-    public String[] call(ProcedureContext procedureContext, String tableId)
-            throws Catalog.TableNotExistException {
-        return call(procedureContext, tableId, false);
-    }
+    public static final String IDENTIFIER = "purge_files";
 
-    public String[] call(ProcedureContext procedureContext, String tableId, 
boolean dryRun)
-            throws Catalog.TableNotExistException {
-        Table table = catalog.getTable(Identifier.fromString(tableId));
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        FileIO fileIO = fileStoreTable.fileIO();
-        Path tablePath = fileStoreTable.snapshotManager().tablePath();
-        ArrayList<String> deleteDir;
-        try {
-            FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
-            deleteDir = new ArrayList<>(fileStatuses.length);
-            Arrays.stream(fileStatuses)
-                    .filter(f -> !f.getPath().getName().contains("schema"))
-                    .forEach(
-                            fileStatus -> {
-                                try {
-                                    
deleteDir.add(fileStatus.getPath().getName());
-                                    if (!dryRun) {
-                                        fileIO.delete(fileStatus.getPath(), 
true);
-                                    }
-                                } catch (IOException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            });
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return deleteDir.isEmpty()
-                ? new String[] {"There are no dir to be deleted."}
-                : deleteDir.toArray(new String[0]);
+    public String[] call(ProcedureContext procedureContext, String tableId) 
throws Exception {
+        ((FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
+        return new String[] {"Success"};
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 64529db3c8..db35fc6c9d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -18,13 +18,8 @@
 
 package org.apache.paimon.flink.procedure;
 
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
@@ -32,10 +27,6 @@ import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
 import org.apache.flink.types.Row;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
 /**
  * A procedure to purge files for a table. Usage:
  *
@@ -48,42 +39,11 @@ public class PurgeFilesProcedure extends ProcedureBase {
 
     public static final String IDENTIFIER = "purge_files";
 
-    @ProcedureHint(
-            argument = {
-                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
-                @ArgumentHint(name = "dry_run", type = 
@DataTypeHint("BOOLEAN"), isOptional = true)
-            })
-    public @DataTypeHint("ROW<purged_file_path STRING>") Row[] call(
-            ProcedureContext procedureContext, String tableId, Boolean dryRun)
-            throws Catalog.TableNotExistException {
-        Table table = catalog.getTable(Identifier.fromString(tableId));
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        FileIO fileIO = fileStoreTable.fileIO();
-        Path tablePath = fileStoreTable.snapshotManager().tablePath();
-        ArrayList<String> deleteDir;
-        try {
-            FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
-            deleteDir = new ArrayList<>(fileStatuses.length);
-            Arrays.stream(fileStatuses)
-                    .filter(f -> !f.getPath().getName().contains("schema"))
-                    .forEach(
-                            fileStatus -> {
-                                try {
-                                    
deleteDir.add(fileStatus.getPath().getName());
-                                    if (dryRun == null || !dryRun) {
-                                        fileIO.delete(fileStatus.getPath(), 
true);
-                                    }
-                                } catch (IOException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            });
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-
-        return deleteDir.isEmpty()
-                ? new Row[] {Row.of("There are no dir to be deleted.")}
-                : deleteDir.stream().map(Row::of).toArray(Row[]::new);
+    @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
+    public @DataTypeHint("ROW<result STRING>") Row[] call(
+            ProcedureContext procedureContext, String tableId) throws 
Exception {
+        ((FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
+        return new Row[] {Row.of("Success")};
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
index 6cd4351783..23ad3c9507 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
@@ -29,7 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class PurgeFilesProcedureITCase extends CatalogITCaseBase {
 
     @Test
-    public void testPurgeFiles() throws Exception {
+    public void testPurgeFiles() {
         sql(
                 "CREATE TABLE T (id INT, name STRING,"
                         + " PRIMARY KEY (id) NOT ENFORCED)"
@@ -41,41 +41,7 @@ public class PurgeFilesProcedureITCase extends 
CatalogITCaseBase {
         sql("INSERT INTO T VALUES (1, 'a')");
         sql("CALL sys.purge_files(`table` => 'default.T')");
         assertThat(sql("select * from `T`")).containsExactly();
-
-        sql("INSERT INTO T VALUES (2, 'a')");
-        assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
-    }
-
-    @Test
-    public void testPurgeFilesDryRun() {
-        sql(
-                "CREATE TABLE T (id INT, name STRING,"
-                        + " PRIMARY KEY (id) NOT ENFORCED)"
-                        + " WITH ('bucket'='1')");
-        // There are no dir to delete.
-        assertThat(
-                        sql("CALL sys.purge_files(`table` => 'default.T', 
`dry_run` => true)")
-                                .stream()
-                                .map(row -> row.getField(0)))
-                .containsExactlyInAnyOrder("There are no dir to be deleted.");
-
-        sql("INSERT INTO T VALUES (1, 'a')");
-        assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
-
-        // dry run.
-        assertThat(
-                        sql("CALL sys.purge_files(`table` => 'default.T', 
`dry_run` => true)")
-                                .stream()
-                                .map(row -> row.getField(0)))
-                .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
-
-        assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
-
-        assertThat(
-                        sql("CALL sys.purge_files(`table` => 
'default.T')").stream()
-                                .map(row -> row.getField(0)))
-                .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
-        assertThat(sql("select * from `T`")).containsExactly();
+        assertThat(sql("select snapshot_id from `T$snapshots`")).hasSize(1);
 
         sql("INSERT INTO T VALUES (2, 'a')");
         assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
index 3def44b5ae..75444cb0ad 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
@@ -18,9 +18,6 @@
 
 package org.apache.paimon.spark.procedure;
 
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -31,26 +28,18 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /** A procedure to purge files for a table. */
 public class PurgeFilesProcedure extends BaseProcedure {
 
     private static final ProcedureParameter[] PARAMETERS =
-            new ProcedureParameter[] {
-                ProcedureParameter.required("table", StringType),
-                ProcedureParameter.optional("dry_run", BooleanType)
-            };
+            new ProcedureParameter[] {ProcedureParameter.required("table", 
StringType)};
 
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("purged_file_path", StringType, true, 
Metadata.empty())
+                        new StructField("result", StringType, true, 
Metadata.empty())
                     });
 
     private PurgeFilesProcedure(TableCatalog tableCatalog) {
@@ -70,44 +59,16 @@ public class PurgeFilesProcedure extends BaseProcedure {
     @Override
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-        boolean dryRun = !args.isNullAt(1) && args.getBoolean(1);
-
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
-                    FileStoreTable fileStoreTable = (FileStoreTable) table;
-                    FileIO fileIO = fileStoreTable.fileIO();
-                    Path tablePath = 
fileStoreTable.snapshotManager().tablePath();
-                    ArrayList<String> deleteDir;
                     try {
-                        FileStatus[] fileStatuses = 
fileIO.listStatus(tablePath);
-                        deleteDir = new ArrayList<>(fileStatuses.length);
-                        Arrays.stream(fileStatuses)
-                                .filter(f -> 
!f.getPath().getName().contains("schema"))
-                                .forEach(
-                                        fileStatus -> {
-                                            try {
-                                                
deleteDir.add(fileStatus.getPath().getName());
-                                                if (!dryRun) {
-                                                    
fileIO.delete(fileStatus.getPath(), true);
-                                                }
-                                            } catch (IOException e) {
-                                                throw new RuntimeException(e);
-                                            }
-                                        });
-                        spark().catalog().refreshTable(table.fullName());
-                    } catch (IOException e) {
+                        ((FileStoreTable) table).purgeFiles();
+                    } catch (Exception e) {
                         throw new RuntimeException(e);
                     }
 
-                    return deleteDir.isEmpty()
-                            ? new InternalRow[] {
-                                newInternalRow(
-                                        UTF8String.fromString("There are no 
dir to be deleted."))
-                            }
-                            : deleteDir.stream()
-                                    .map(x -> 
newInternalRow(UTF8String.fromString(x)))
-                                    .toArray(InternalRow[]::new);
+                    return new InternalRow[] 
{newInternalRow(UTF8String.fromString("Success"))};
                 });
     }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
index 02fa60f1e0..9bf81b56ef 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure
 import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
 
 class PurgeFilesProcedureTest extends PaimonSparkTestBase {
 
@@ -35,40 +36,10 @@ class PurgeFilesProcedureTest extends PaimonSparkTestBase {
 
     spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
     checkAnswer(spark.sql("select * from test.T"), Nil)
+    assertThat(spark.sql("select snapshot_id from 
test.`T$snapshots`").collect()).hasSize(1)
 
     spark.sql("insert into T select '2', 'aa'");
     checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
   }
 
-  test("Paimon procedure: purge files test with dry run.") {
-    spark.sql(s"""
-                 |CREATE TABLE T (id STRING, name STRING)
-                 |USING PAIMON
-                 |""".stripMargin)
-
-    // There are no dir to be deleted.
-    checkAnswer(
-      spark.sql("CALL paimon.sys.purge_files(table => 'test.T')"),
-      Row("There are no dir to be deleted.") :: Nil
-    )
-
-    spark.sql("insert into T select '1', 'aa'");
-    checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
-
-    // dry run.
-    checkAnswer(
-      spark.sql("CALL paimon.sys.purge_files(table => 'test.T', dry_run => 
true)"),
-      Row("snapshot") :: Row("bucket-0") :: Row("manifest") :: Nil
-    )
-    checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
-
-    // Do delete.
-    spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
-    checkAnswer(spark.sql("select * from test.T"), Nil)
-
-    // insert new data.
-    spark.sql("insert into T select '2', 'aa'");
-    checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
-  }
-
 }

Reply via email to