JingsongLi commented on code in PR #3671:
URL: https://github.com/apache/paimon/pull/3671#discussion_r1666327648


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -18,51 +18,121 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ScanParallelExecutor;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Action to remove the orphan data files and metadata files. */
-public class RemoveOrphanFilesAction extends TableActionBase {
+public class RemoveOrphanFilesAction extends ActionBase {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
 
-    private final OrphanFilesClean orphanFilesClean;
+    private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
 
     public RemoveOrphanFilesAction(
             String warehouse,
             String databaseName,
             String tableName,
-            Map<String, String> catalogConfig) {
-        super(warehouse, databaseName, tableName, catalogConfig);
+            Map<String, String> catalogConfig)
+            throws Catalog.TableNotExistException, 
Catalog.DatabaseNotExistException {
+        super(warehouse, catalogConfig);
+        this.tableOrphanFilesCleans = new ArrayList<>();
+        constructOrphanFilesCleans(catalog, databaseName, tableName, 
tableOrphanFilesCleans);
+    }
 
+    public static void constructOrphanFilesCleans(
+            Catalog catalog,
+            String databaseName,
+            String tableName,
+            List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
+            throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
+        if ("*".equals(tableName)) {
+            for (String t : catalog.listTables(databaseName)) {
+                constructOrphanFilesCleanInternal(catalog, databaseName, t, 
tableOrphanFilesCleans);
+            }
+        } else {
+            constructOrphanFilesCleanInternal(
+                    catalog, databaseName, tableName, tableOrphanFilesCleans);
+        }
+    }
+
+    public static void constructOrphanFilesCleanInternal(
+            Catalog catalog,
+            String databaseName,
+            String tableName,
+            List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
+            throws Catalog.TableNotExistException {
+        Identifier identifier = new Identifier(databaseName, tableName);
+        Table table = catalog.getTable(identifier);
         checkArgument(
                 table instanceof FileStoreTable,
                 "Only FileStoreTable supports remove-orphan-files action. The 
table type is '%s'.",
                 table.getClass().getName());
-        this.orphanFilesClean = new OrphanFilesClean((FileStoreTable) table);
+
+        tableOrphanFilesCleans.add(
+                Pair.of(tableName, new OrphanFilesClean((FileStoreTable) 
table)));
+    }
+
+    public void olderThan(String olderThan) {
+        this.tableOrphanFilesCleans.forEach(
+                orphanFilesClean -> 
orphanFilesClean.getRight().olderThan(olderThan));
     }
 
-    public RemoveOrphanFilesAction olderThan(String timestamp) {
-        this.orphanFilesClean.olderThan(timestamp);
-        return this;
+    public void dryRun() {
+        this.tableOrphanFilesCleans.forEach(
+                orphanFilesClean -> 
orphanFilesClean.getRight().fileCleaner(path -> {}));
     }
 
-    public RemoveOrphanFilesAction dryRun() {
-        this.orphanFilesClean.fileCleaner(path -> {});
-        return this;
+    public static String[] doCleanOrphanFiles(
+            List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans) {
+        ForkJoinPool executePool = ScanParallelExecutor.getExecutePool(null);
+        List<ForkJoinTask<List<String>>> tasks = new ArrayList<>();
+        for (Pair<String, OrphanFilesClean> tableOrphanFilesClean : 
tableOrphanFilesCleans) {
+            String tableName = tableOrphanFilesClean.getLeft();
+            OrphanFilesClean orphanFilesClean = 
tableOrphanFilesClean.getRight();
+            ForkJoinTask<List<String>> task =
+                    executePool.submit(
+                            () -> {
+                                List<String> result;
+                                try {
+                                    result =
+                                            OrphanFilesClean.showDeletedFiles(
+                                                    orphanFilesClean.clean(), 
200);
+                                } catch (Exception e) {
+                                    throw new RuntimeException(e);
+                                }
+                                String files = String.join(", ", result);
+                                LOG.info("table {} orphan files : [{}].", 
tableName, files);
+                                return result;
+                            });
+            tasks.add(task);
+        }
+
+        List<String> cleanOrphanFiles = new ArrayList<>();
+        for (ForkJoinTask<List<String>> task : tasks) {
+            cleanOrphanFiles.addAll(task.join());

Review Comment:
   just keep 200 lines max.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to