This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ccaafc942 [core] support remove orphan files for database (#3671)
ccaafc942 is described below

commit ccaafc9429209b68bdbb1948a7cc6431b02ca207
Author: wangwj <[email protected]>
AuthorDate: Fri Jul 5 21:24:28 2024 +0800

    [core] support remove orphan files for database (#3671)
---
 .../apache/paimon/operation/OrphanFilesClean.java  |  40 ++++++
 .../flink/action/ExpirePartitionsAction.java       |  11 +-
 .../flink/action/RemoveOrphanFilesAction.java      |  91 ++++++++++----
 .../action/RemoveOrphanFilesActionFactory.java     |  25 ++--
 .../procedure/RemoveOrphanFilesProcedure.java      |  27 +++--
 .../action/RemoveOrphanFilesActionITCase.java      |  82 ++++++++++++-
 .../procedure/RemoveOrphanFilesProcedure.java      | 135 ++++++++++++++++-----
 .../procedure/RemoveOrphanFilesProcedureTest.scala |  70 ++++++++++-
 8 files changed, 390 insertions(+), 91 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index bccfee93e..d725f3360 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -21,6 +21,8 @@ package org.apache.paimon.operation;
 import org.apache.paimon.Changelog;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -32,8 +34,10 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -60,6 +64,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
  * To remove the data files and metadata files that are not used by table 
(so-called "orphan
@@ -82,6 +87,7 @@ public class OrphanFilesClean {
 
     private static final int READ_FILE_RETRY_NUM = 3;
     private static final int READ_FILE_RETRY_INTERVAL = 5;
+    public static final int SHOW_LIMIT = 200;
 
     private final SnapshotManager snapshotManager;
     private final TagManager tagManager;
@@ -557,4 +563,38 @@ public class OrphanFilesClean {
         }
         return result;
     }
+
+    public static List<Pair<String, OrphanFilesClean>> 
constructOrphanFilesCleans(
+            Catalog catalog, String databaseName, @Nullable String tableName)
+            throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
+        List<Pair<String, OrphanFilesClean>> orphanFilesCleans = new 
ArrayList<>();
+        List<String> tableNames = Collections.singletonList(tableName);
+        if (tableName == null || "*".equals(tableName)) {
+            tableNames = catalog.listTables(databaseName);
+        }
+
+        for (String t : tableNames) {
+            Identifier identifier = new Identifier(databaseName, t);
+            Table table = catalog.getTable(identifier);
+            checkArgument(
+                    table instanceof FileStoreTable,
+                    "Only FileStoreTable supports remove-orphan-files action. 
The table type is '%s'.",
+                    table.getClass().getName());
+
+            orphanFilesCleans.add(Pair.of(t, new 
OrphanFilesClean((FileStoreTable) table)));
+        }
+
+        return orphanFilesCleans;
+    }
+
+    public static void initOlderThan(
+            String olderThan, List<Pair<String, OrphanFilesClean>> 
tableOrphanFilesCleans) {
+        tableOrphanFilesCleans.forEach(
+                orphanFilesClean -> 
orphanFilesClean.getRight().olderThan(olderThan));
+    }
+
+    public static void initDryRun(List<Pair<String, OrphanFilesClean>> 
tableOrphanFilesCleans) {
+        tableOrphanFilesCleans.forEach(
+                orphanFilesClean -> 
orphanFilesClean.getRight().fileCleaner(path -> {}));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 7d1c4dd5e..fda348be8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -28,13 +28,10 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.paimon.catalog.CatalogUtils.table;
-
 /** Expire partitions action for Flink. */
 public class ExpirePartitionsAction extends TableActionBase {
-    private final String expirationTime;
-    private final String timestampFormatter;
-    private PartitionExpire partitionExpire;
+
+    private final PartitionExpire partitionExpire;
 
     public ExpirePartitionsAction(
             String warehouse,
@@ -50,11 +47,9 @@ public class ExpirePartitionsAction extends TableActionBase {
                             "Only FileStoreTable supports expire_partitions 
action. The table type is '%s'.",
                             table.getClass().getName()));
         }
-        this.expirationTime = expirationTime;
-        this.timestampFormatter = timestampFormatter;
 
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        FileStore fileStore = fileStoreTable.store();
+        FileStore<?> fileStore = fileStoreTable.store();
         this.partitionExpire =
                 new PartitionExpire(
                         fileStore.partitionType(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
index 035882603..df0101deb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
@@ -18,51 +18,92 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.Pair;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
 
 /** Action to remove the orphan data files and metadata files. */
-public class RemoveOrphanFilesAction extends TableActionBase {
-    private static final Logger LOG = 
LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+public class RemoveOrphanFilesAction extends ActionBase {
 
-    private final OrphanFilesClean orphanFilesClean;
+    private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
 
     public RemoveOrphanFilesAction(
             String warehouse,
             String databaseName,
-            String tableName,
-            Map<String, String> catalogConfig) {
-        super(warehouse, databaseName, tableName, catalogConfig);
-
-        checkArgument(
-                table instanceof FileStoreTable,
-                "Only FileStoreTable supports remove-orphan-files action. The 
table type is '%s'.",
-                table.getClass().getName());
-        this.orphanFilesClean = new OrphanFilesClean((FileStoreTable) table);
+            @Nullable String tableName,
+            Map<String, String> catalogConfig)
+            throws Catalog.TableNotExistException, 
Catalog.DatabaseNotExistException {
+        super(warehouse, catalogConfig);
+        this.tableOrphanFilesCleans =
+                OrphanFilesClean.constructOrphanFilesCleans(catalog, 
databaseName, tableName);
     }
 
-    public RemoveOrphanFilesAction olderThan(String timestamp) {
-        this.orphanFilesClean.olderThan(timestamp);
-        return this;
+    public void olderThan(String olderThan) {
+        OrphanFilesClean.initOlderThan(olderThan, this.tableOrphanFilesCleans);
     }
 
-    public RemoveOrphanFilesAction dryRun() {
-        this.orphanFilesClean.fileCleaner(path -> {});
-        return this;
+    public void dryRun() {
+        OrphanFilesClean.initDryRun(this.tableOrphanFilesCleans);
+    }
+
+    public static String[] executeOrphanFilesClean(
+            List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
+            throws ExecutionException, InterruptedException {
+        int availableProcessors = Runtime.getRuntime().availableProcessors();
+        ExecutorService executePool =
+                new ThreadPoolExecutor(
+                        availableProcessors,
+                        availableProcessors,
+                        1,
+                        TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<>(),
+                        new ExecutorThreadFactory(
+                                Thread.currentThread().getName() + 
"-RemoveOrphanFiles"));
+
+        List<Future<List<Path>>> tasks = new ArrayList<>();
+        for (Pair<String, OrphanFilesClean> tableOrphanFilesClean : 
tableOrphanFilesCleans) {
+            OrphanFilesClean orphanFilesClean = 
tableOrphanFilesClean.getRight();
+            Future<List<Path>> task =
+                    executePool.submit(
+                            () -> {
+                                try {
+                                    return orphanFilesClean.clean();
+                                } catch (Exception e) {
+                                    throw new RuntimeException(e);
+                                }
+                            });
+            tasks.add(task);
+        }
+
+        List<Path> cleanOrphanFiles = new ArrayList<>();
+        for (Future<List<Path>> task : tasks) {
+            cleanOrphanFiles.addAll(task.get());
+        }
+
+        executePool.shutdownNow();
+
+        return OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
+                .toArray(new String[0]);
     }
 
     @Override
     public void run() throws Exception {
-        List<String> result = 
OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200);
-        String files = String.join(", ", result);
-        LOG.info("orphan files: [{}]", files);
+        executeOrphanFilesClean(tableOrphanFilesCleans);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
index 0a461d839..769453d4a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
@@ -18,16 +18,15 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.flink.api.java.tuple.Tuple3;
-
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
 /** Factory to create {@link RemoveOrphanFilesAction}. */
 public class RemoveOrphanFilesActionFactory implements ActionFactory {
 
     public static final String IDENTIFIER = "remove_orphan_files";
-
     private static final String OLDER_THAN = "older_than";
     private static final String DRY_RUN = "dry_run";
 
@@ -38,12 +37,20 @@ public class RemoveOrphanFilesActionFactory implements 
ActionFactory {
 
     @Override
     public Optional<Action> create(MultipleParameterToolAdapter params) {
-        Tuple3<String, String, String> tablePath = getTablePath(params);
+        String warehouse = params.get(WAREHOUSE);
+        checkNotNull(warehouse);
+        String database = params.get(DATABASE);
+        checkNotNull(database);
+        String table = params.get(TABLE);
+
         Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
 
-        RemoveOrphanFilesAction action =
-                new RemoveOrphanFilesAction(
-                        tablePath.f0, tablePath.f1, tablePath.f2, 
catalogConfig);
+        RemoveOrphanFilesAction action;
+        try {
+            action = new RemoveOrphanFilesAction(warehouse, database, table, 
catalogConfig);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
 
         if (params.has(OLDER_THAN)) {
             action.olderThan(params.get(OLDER_THAN));
@@ -76,5 +83,9 @@ public class RemoveOrphanFilesActionFactory implements 
ActionFactory {
         System.out.println(
                 "When '--dry_run true', view only orphan files, don't actually 
remove files. Default is false.");
         System.out.println();
+
+        System.out.println(
+                "If the table is null or *, all orphan files in all tables 
under the db will be cleaned up.");
+        System.out.println();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 26b637a35..5c7c75850 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,13 +20,14 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import java.util.List;
+
+import static 
org.apache.paimon.flink.action.RemoveOrphanFilesAction.executeOrphanFilesClean;
 
 /**
  * Remove orphan files procedure. Usage:
@@ -37,6 +38,9 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  *
  *  -- use custom file delete interval
  *  CALL sys.remove_orphan_files('tableId', '2023-12-31 23:59:59')
+ *
+ *  -- remove all tables' orphan files in db
+ *  CALL sys.remove_orphan_files('databaseName.*', '2023-12-31 23:59:59')
  * </code></pre>
  */
 public class RemoveOrphanFilesProcedure extends ProcedureBase {
@@ -56,24 +60,21 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
             ProcedureContext procedureContext, String tableId, String 
olderThan, boolean dryRun)
             throws Exception {
         Identifier identifier = Identifier.fromString(tableId);
-        Table table = catalog.getTable(identifier);
+        String databaseName = identifier.getDatabaseName();
+        String tableName = identifier.getObjectName();
 
-        checkArgument(
-                table instanceof FileStoreTable,
-                "Only FileStoreTable supports remove-orphan-files action. The 
table type is '%s'.",
-                table.getClass().getName());
+        List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans =
+                OrphanFilesClean.constructOrphanFilesCleans(catalog, 
databaseName, tableName);
 
-        OrphanFilesClean orphanFilesClean = new 
OrphanFilesClean((FileStoreTable) table);
         if (!StringUtils.isBlank(olderThan)) {
-            orphanFilesClean.olderThan(olderThan);
+            OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
         }
 
         if (dryRun) {
-            orphanFilesClean.fileCleaner(path -> {});
+            OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
         }
 
-        return OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200)
-                .toArray(new String[0]);
+        return executeOrphanFilesClean(tableOrphanFilesCleans);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 0c335c726..9e7b6a7c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -44,8 +44,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 /** IT cases for {@link RemoveOrphanFilesAction}. */
 public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
 
-    @Test
-    public void testRunWithoutException() throws Exception {
+    private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
+    private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";
+
+    private FileStoreTable createTableAndWriteData(String tableName) throws 
Exception {
         RowType rowType =
                 RowType.of(
                         new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
@@ -53,6 +55,7 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
 
         FileStoreTable table =
                 createFileStoreTable(
+                        tableName,
                         rowType,
                         Collections.emptyList(),
                         Collections.singletonList("k"),
@@ -65,14 +68,27 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
 
         writeData(rowData(1L, BinaryString.fromString("Hi")));
 
-        Path orphanFile1 = new Path(table.location(), "bucket-0/orphan_file1");
-        Path orphanFile2 = new Path(table.location(), "bucket-0/orphan_file2");
+        Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
+        Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
 
         FileIO fileIO = table.fileIO();
         fileIO.writeFile(orphanFile1, "a", true);
         Thread.sleep(2000);
         fileIO.writeFile(orphanFile2, "b", true);
 
+        return table;
+    }
+
+    private Path getOrphanFilePath(FileStoreTable table, String orphanFile) {
+        return new Path(table.location(), orphanFile);
+    }
+
+    @Test
+    public void testRunWithoutException() throws Exception {
+        FileStoreTable table = createTableAndWriteData(tableName);
+        Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
+        Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
+
         List<String> args =
                 new ArrayList<>(
                         Arrays.asList(
@@ -117,4 +133,62 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
                         Row.of(orphanFile1.toUri().getPath()),
                         Row.of(orphanFile2.toUri().getPath()));
     }
+
+    @Test
+    public void testRemoveDatabaseOrphanFilesITCase() throws Exception {
+        FileStoreTable table1 = createTableAndWriteData("tableName1");
+        Path orphanFile11 = getOrphanFilePath(table1, ORPHAN_FILE_1);
+        Path orphanFile12 = getOrphanFilePath(table1, ORPHAN_FILE_2);
+        FileStoreTable table2 = createTableAndWriteData("tableName2");
+        Path orphanFile21 = getOrphanFilePath(table2, ORPHAN_FILE_1);
+        Path orphanFile22 = getOrphanFilePath(table2, ORPHAN_FILE_2);
+
+        List<String> args =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "remove_orphan_files",
+                                "--warehouse",
+                                warehouse,
+                                "--database",
+                                database,
+                                "--table",
+                                "*"));
+        RemoveOrphanFilesAction action1 = 
createAction(RemoveOrphanFilesAction.class, args);
+        assertThatCode(action1::run).doesNotThrowAnyException();
+
+        args.add("--older_than");
+        args.add("2023-12-31 23:59:59");
+        RemoveOrphanFilesAction action2 = 
createAction(RemoveOrphanFilesAction.class, args);
+        assertThatCode(action2::run).doesNotThrowAnyException();
+
+        String withoutOlderThan =
+                String.format("CALL sys.remove_orphan_files('%s.%s')", 
database, "*");
+        CloseableIterator<Row> withoutOlderThanCollect = 
callProcedure(withoutOlderThan);
+        
assertThat(ImmutableList.copyOf(withoutOlderThanCollect).size()).isEqualTo(0);
+
+        String withDryRun =
+                String.format(
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59', true)",
+                        database, "*");
+        ImmutableList<Row> actualDryRunDeleteFile = 
ImmutableList.copyOf(callProcedure(withDryRun));
+        assertThat(actualDryRunDeleteFile)
+                .containsExactlyInAnyOrder(
+                        Row.of(orphanFile11.toUri().getPath()),
+                        Row.of(orphanFile12.toUri().getPath()),
+                        Row.of(orphanFile21.toUri().getPath()),
+                        Row.of(orphanFile22.toUri().getPath()));
+
+        String withOlderThan =
+                String.format(
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
+                        database, "*");
+        ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(callProcedure(withOlderThan));
+
+        assertThat(actualDeleteFile)
+                .containsExactlyInAnyOrder(
+                        Row.of(orphanFile11.toUri().getPath()),
+                        Row.of(orphanFile12.toUri().getPath()),
+                        Row.of(orphanFile21.toUri().getPath()),
+                        Row.of(orphanFile22.toUri().getPath()));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index b88b2d169..6c1ad2405 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -18,20 +18,33 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.OrphanFilesClean;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -41,10 +54,15 @@ import static 
org.apache.spark.sql.types.DataTypes.StringType;
  *
  * <pre><code>
  *  CALL sys.remove_orphan_files(table => 'tableId', [older_than => 
'2023-10-31 12:00:00'])
+ *
+ *  CALL sys.remove_orphan_files(table => 'databaseName.*', [older_than => 
'2023-10-31 12:00:00'])
  * </code></pre>
  */
 public class RemoveOrphanFilesProcedure extends BaseProcedure {
 
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class.getName());
+
     private static final ProcedureParameter[] PARAMETERS =
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
@@ -74,36 +92,93 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
 
     @Override
     public InternalRow[] call(InternalRow args) {
-        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        org.apache.paimon.catalog.Identifier identifier;
+        String tableId = args.getString(0);
+        Preconditions.checkArgument(
+                tableId != null && !tableId.isEmpty(),
+                "Cannot handle an empty tableId for argument %s",
+                tableId);
+
+        if (tableId.endsWith(".*")) {
+            identifier = 
org.apache.paimon.catalog.Identifier.fromString(tableId);
+        } else {
+            identifier =
+                    org.apache.paimon.catalog.Identifier.fromString(
+                            toIdentifier(args.getString(0), 
PARAMETERS[0].name()).toString());
+        }
+        LOG.info("identifier is {}.", identifier);
+
+        List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
+        try {
+            tableOrphanFilesCleans =
+                    OrphanFilesClean.constructOrphanFilesCleans(
+                            ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog(),
+                            identifier.getDatabaseName(),
+                            identifier.getObjectName());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
         String olderThan = args.isNullAt(1) ? null : args.getString(1);
-        boolean dryRun = args.isNullAt(2) ? false : args.getBoolean(2);
-
-        return modifyPaimonTable(
-                tableIdent,
-                table -> {
-                    checkArgument(table instanceof FileStoreTable);
-                    OrphanFilesClean orphanFilesClean =
-                            new OrphanFilesClean((FileStoreTable) table);
-                    if (!StringUtils.isBlank(olderThan)) {
-                        orphanFilesClean.olderThan(olderThan);
-                    }
-                    if (dryRun) {
-                        orphanFilesClean.fileCleaner(path -> {});
-                    }
-                    try {
-                        List<String> result =
-                                
OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200);
-                        InternalRow[] rows = new InternalRow[result.size()];
-                        int index = 0;
-                        for (String line : result) {
-                            rows[index] = 
newInternalRow(UTF8String.fromString(line));
-                            index++;
-                        }
-                        return rows;
-                    } catch (Exception e) {
-                        throw new RuntimeException("Call remove_orphan_files 
error", e);
-                    }
-                });
+        if (!StringUtils.isBlank(olderThan)) {
+            OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
+        }
+
+        boolean dryRun = !args.isNullAt(2) && args.getBoolean(2);
+        if (dryRun) {
+            OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
+        }
+
+        int availableProcessors = Runtime.getRuntime().availableProcessors();
+        ExecutorService executePool =
+                new ThreadPoolExecutor(
+                        availableProcessors,
+                        availableProcessors,
+                        1,
+                        TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<>(),
+                        new ExecutorThreadFactory(
+                                Thread.currentThread().getName() + 
"-RemoveOrphanFiles"));
+        List<Future<List<Path>>> tasks = new ArrayList<>();
+        for (Pair<String, OrphanFilesClean> tableOrphanFilesClean : 
tableOrphanFilesCleans) {
+            String tableName = tableOrphanFilesClean.getLeft();
+            OrphanFilesClean orphanFilesClean = 
tableOrphanFilesClean.getRight();
+            Future<List<Path>> task =
+                    executePool.submit(
+                            () ->
+                                    modifyPaimonTable(
+                                            toIdentifier(tableName, tableName),
+                                            table -> {
+                                                checkArgument(table instanceof 
FileStoreTable);
+                                                try {
+                                                    return 
orphanFilesClean.clean();
+                                                } catch (Exception e) {
+                                                    throw new RuntimeException(
+                                                            "Call 
remove_orphan_files error", e);
+                                                }
+                                            }));
+            tasks.add(task);
+        }
+
+        List<Path> cleanOrphanFiles = new ArrayList<>();
+        for (Future<List<Path>> task : tasks) {
+            try {
+                cleanOrphanFiles.addAll(task.get());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        executePool.shutdownNow();
+
+        List<InternalRow> showLimitedDeletedFiles = new 
ArrayList<>(cleanOrphanFiles.size());
+        OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
+                .forEach(
+                        deletedFile ->
+                                showLimitedDeletedFiles.add(
+                                        
newInternalRow(UTF8String.fromString(deletedFile))));
+
+        return showLimitedDeletedFiles.toArray(new InternalRow[0]);
     }
 
     public static ProcedureBuilder builder() {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index c0bf84c4d..23a014d0f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit
 
 class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
 
+  private val ORPHAN_FILE_1 = "bucket-0/orphan_file1"
+  private val ORPHAN_FILE_2 = "bucket-0/orphan_file2"
+
   test("Paimon procedure: remove orphan files") {
     spark.sql(s"""
                  |CREATE TABLE T (id STRING, name STRING)
@@ -41,8 +44,8 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     val fileIO = table.fileIO()
     val tablePath = table.location()
 
-    val orphanFile1 = new Path(tablePath, "bucket-0/orphan_file1")
-    val orphanFile2 = new Path(tablePath, "bucket-0/orphan_file2")
+    val orphanFile1 = new Path(tablePath, ORPHAN_FILE_1)
+    val orphanFile2 = new Path(tablePath, ORPHAN_FILE_2)
 
     fileIO.tryToWriteAtomic(orphanFile1, "a")
     Thread.sleep(2000)
@@ -84,8 +87,8 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     val fileIO = table.fileIO()
     val tablePath = table.location()
 
-    val orphanFile1 = new Path(tablePath, "bucket-0/orphan_file1")
-    val orphanFile2 = new Path(tablePath, "bucket-0/orphan_file2")
+    val orphanFile1 = new Path(tablePath, ORPHAN_FILE_1)
+    val orphanFile2 = new Path(tablePath, ORPHAN_FILE_2)
 
     fileIO.writeFile(orphanFile1, "a", true)
     Thread.sleep(2000)
@@ -104,4 +107,63 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
       Row(orphanFile1.toUri.getPath) :: Row(orphanFile2.toUri.getPath) :: Nil
     )
   }
+
+  test("Paimon procedure: remove database orphan files") {
+    spark.sql(s"""
+                 |CREATE TABLE T1 (id STRING, name STRING)
+                 |USING PAIMON
+                 |TBLPROPERTIES ('primary-key'='id')
+                 |""".stripMargin)
+    spark.sql(s"INSERT INTO T1 VALUES ('1', 'a'), ('2', 'b')")
+
+    spark.sql(s"""
+                 |CREATE TABLE T2 (id STRING, name STRING)
+                 |USING PAIMON
+                 |TBLPROPERTIES ('primary-key'='id')
+                 |""".stripMargin)
+    spark.sql(s"INSERT INTO T2 VALUES ('1', 'a'), ('2', 'b')")
+
+    val table1 = loadTable("T1")
+    val table2 = loadTable("T2")
+    val fileIO1 = table1.fileIO()
+    val fileIO2 = table2.fileIO()
+    val tablePath1 = table1.location()
+    val tablePath2 = table2.location()
+
+    val orphanFile11 = new Path(tablePath1, ORPHAN_FILE_1)
+    val orphanFile12 = new Path(tablePath1, ORPHAN_FILE_2)
+    val orphanFile21 = new Path(tablePath2, ORPHAN_FILE_1)
+    val orphanFile22 = new Path(tablePath2, ORPHAN_FILE_2)
+
+    fileIO1.tryToWriteAtomic(orphanFile11, "a")
+    fileIO2.tryToWriteAtomic(orphanFile21, "a")
+    Thread.sleep(2000)
+    fileIO1.tryToWriteAtomic(orphanFile12, "b")
+    fileIO2.tryToWriteAtomic(orphanFile22, "b")
+
+    // by default, no file deleted
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Nil)
+
+    val orphanFile12ModTime = 
fileIO1.getFileStatus(orphanFile12).getModificationTime
+    val older_than1 = DateTimeUtils.formatLocalDateTime(
+      DateTimeUtils.toLocalDateTime(
+        orphanFile12ModTime -
+          TimeUnit.SECONDS.toMillis(1)),
+      3)
+
+    checkAnswer(
+      spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than 
=> '$older_than1')"),
+      Row(orphanFile11.toUri.getPath) :: Row(orphanFile21.toUri.getPath) :: Nil
+    )
+
+    val older_than2 = DateTimeUtils.formatLocalDateTime(
+      DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
+      3)
+
+    checkAnswer(
+      spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than 
=> '$older_than2')"),
+      Row(orphanFile12.toUri.getPath) :: Row(orphanFile22.toUri.getPath) :: Nil
+    )
+  }
+
 }


Reply via email to