This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 126da62ae [core] Adjust database OrphanFilesClean
126da62ae is described below

commit 126da62ae3e82769196f0585b43767f1ee37d84a
Author: Jingsong <[email protected]>
AuthorDate: Fri Jul 5 21:48:32 2024 +0800

    [core] Adjust database OrphanFilesClean
---
 .../apache/paimon/operation/OrphanFilesClean.java  | 41 +++++++----
 .../flink/action/RemoveOrphanFilesAction.java      | 64 ++----------------
 .../procedure/RemoveOrphanFilesProcedure.java      | 13 ++--
 .../procedure/RemoveOrphanFilesProcedure.java      | 79 ++++------------------
 4 files changed, 54 insertions(+), 143 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index d725f3360..b0b06c33d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -37,7 +37,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.FileUtils;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -57,6 +56,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -87,7 +89,7 @@ public class OrphanFilesClean {
 
     private static final int READ_FILE_RETRY_NUM = 3;
     private static final int READ_FILE_RETRY_INTERVAL = 5;
-    public static final int SHOW_LIMIT = 200;
+    private static final int SHOW_LIMIT = 200;
 
     private final SnapshotManager snapshotManager;
     private final TagManager tagManager;
@@ -564,10 +566,10 @@ public class OrphanFilesClean {
         return result;
     }
 
-    public static List<Pair<String, OrphanFilesClean>> 
constructOrphanFilesCleans(
+    public static List<OrphanFilesClean> createOrphanFilesCleans(
             Catalog catalog, String databaseName, @Nullable String tableName)
             throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
-        List<Pair<String, OrphanFilesClean>> orphanFilesCleans = new 
ArrayList<>();
+        List<OrphanFilesClean> orphanFilesCleans = new ArrayList<>();
         List<String> tableNames = Collections.singletonList(tableName);
         if (tableName == null || "*".equals(tableName)) {
             tableNames = catalog.listTables(databaseName);
@@ -581,20 +583,33 @@ public class OrphanFilesClean {
                     "Only FileStoreTable supports remove-orphan-files action. 
The table type is '%s'.",
                     table.getClass().getName());
 
-            orphanFilesCleans.add(Pair.of(t, new 
OrphanFilesClean((FileStoreTable) table)));
+            orphanFilesCleans.add(new OrphanFilesClean((FileStoreTable) 
table));
         }
 
         return orphanFilesCleans;
     }
 
-    public static void initOlderThan(
-            String olderThan, List<Pair<String, OrphanFilesClean>> 
tableOrphanFilesCleans) {
-        tableOrphanFilesCleans.forEach(
-                orphanFilesClean -> 
orphanFilesClean.getRight().olderThan(olderThan));
-    }
+    public static String[] executeOrphanFilesClean(List<OrphanFilesClean> 
tableCleans) {
+        ExecutorService executorService =
+                
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        List<Future<List<Path>>> tasks = new ArrayList<>();
+        for (OrphanFilesClean clean : tableCleans) {
+            tasks.add(executorService.submit(clean::clean));
+        }
+
+        List<Path> cleanOrphanFiles = new ArrayList<>();
+        for (Future<List<Path>> task : tasks) {
+            try {
+                cleanOrphanFiles.addAll(task.get());
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
 
-    public static void initDryRun(List<Pair<String, OrphanFilesClean>> 
tableOrphanFilesCleans) {
-        tableOrphanFilesCleans.forEach(
-                orphanFilesClean -> 
orphanFilesClean.getRight().fileCleaner(path -> {}));
+        executorService.shutdownNow();
+        return showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT).toArray(new 
String[0]);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
index df0101deb..c95db99d5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
@@ -19,29 +19,19 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.Pair;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
-import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
+import static 
org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
 
 /** Action to remove the orphan data files and metadata files. */
 public class RemoveOrphanFilesAction extends ActionBase {
 
-    private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
+    private final List<OrphanFilesClean> tableCleans;
 
     public RemoveOrphanFilesAction(
             String warehouse,
@@ -50,60 +40,20 @@ public class RemoveOrphanFilesAction extends ActionBase {
             Map<String, String> catalogConfig)
             throws Catalog.TableNotExistException, 
Catalog.DatabaseNotExistException {
         super(warehouse, catalogConfig);
-        this.tableOrphanFilesCleans =
-                OrphanFilesClean.constructOrphanFilesCleans(catalog, 
databaseName, tableName);
+        this.tableCleans =
+                OrphanFilesClean.createOrphanFilesCleans(catalog, 
databaseName, tableName);
     }
 
     public void olderThan(String olderThan) {
-        OrphanFilesClean.initOlderThan(olderThan, this.tableOrphanFilesCleans);
+        tableCleans.forEach(clean -> clean.olderThan(olderThan));
     }
 
     public void dryRun() {
-        OrphanFilesClean.initDryRun(this.tableOrphanFilesCleans);
-    }
-
-    public static String[] executeOrphanFilesClean(
-            List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
-            throws ExecutionException, InterruptedException {
-        int availableProcessors = Runtime.getRuntime().availableProcessors();
-        ExecutorService executePool =
-                new ThreadPoolExecutor(
-                        availableProcessors,
-                        availableProcessors,
-                        1,
-                        TimeUnit.SECONDS,
-                        new LinkedBlockingQueue<>(),
-                        new ExecutorThreadFactory(
-                                Thread.currentThread().getName() + 
"-RemoveOrphanFiles"));
-
-        List<Future<List<Path>>> tasks = new ArrayList<>();
-        for (Pair<String, OrphanFilesClean> tableOrphanFilesClean : 
tableOrphanFilesCleans) {
-            OrphanFilesClean orphanFilesClean = 
tableOrphanFilesClean.getRight();
-            Future<List<Path>> task =
-                    executePool.submit(
-                            () -> {
-                                try {
-                                    return orphanFilesClean.clean();
-                                } catch (Exception e) {
-                                    throw new RuntimeException(e);
-                                }
-                            });
-            tasks.add(task);
-        }
-
-        List<Path> cleanOrphanFiles = new ArrayList<>();
-        for (Future<List<Path>> task : tasks) {
-            cleanOrphanFiles.addAll(task.get());
-        }
-
-        executePool.shutdownNow();
-
-        return OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
-                .toArray(new String[0]);
+        tableCleans.forEach(clean -> clean.fileCleaner(path -> {}));
     }
 
     @Override
     public void run() throws Exception {
-        executeOrphanFilesClean(tableOrphanFilesCleans);
+        executeOrphanFilesClean(tableCleans);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 5c7c75850..d43056f97 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,14 +20,13 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.util.List;
 
-import static 
org.apache.paimon.flink.action.RemoveOrphanFilesAction.executeOrphanFilesClean;
+import static 
org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
 
 /**
  * Remove orphan files procedure. Usage:
@@ -63,18 +62,18 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
         String databaseName = identifier.getDatabaseName();
         String tableName = identifier.getObjectName();
 
-        List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans =
-                OrphanFilesClean.constructOrphanFilesCleans(catalog, 
databaseName, tableName);
+        List<OrphanFilesClean> tableCleans =
+                OrphanFilesClean.createOrphanFilesCleans(catalog, 
databaseName, tableName);
 
         if (!StringUtils.isBlank(olderThan)) {
-            OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
+            tableCleans.forEach(clean -> clean.olderThan(olderThan));
         }
 
         if (dryRun) {
-            OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
+            tableCleans.forEach(clean -> clean.fileCleaner(path -> {}));
         }
 
-        return executeOrphanFilesClean(tableOrphanFilesCleans);
+        return executeOrphanFilesClean(tableCleans);
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index 6c1ad2405..af973f2a5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -18,12 +18,8 @@
 
 package org.apache.paimon.spark.procedure;
 
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.spark.catalog.WithPaimonCatalog;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
@@ -37,15 +33,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+import static 
org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -108,10 +99,10 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
         }
         LOG.info("identifier is {}.", identifier);
 
-        List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
+        List<OrphanFilesClean> tableCleans;
         try {
-            tableOrphanFilesCleans =
-                    OrphanFilesClean.constructOrphanFilesCleans(
+            tableCleans =
+                    OrphanFilesClean.createOrphanFilesCleans(
                             ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog(),
                             identifier.getDatabaseName(),
                             identifier.getObjectName());
@@ -121,64 +112,20 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
 
         String olderThan = args.isNullAt(1) ? null : args.getString(1);
         if (!StringUtils.isBlank(olderThan)) {
-            OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
+            tableCleans.forEach(clean -> clean.olderThan(olderThan));
         }
 
         boolean dryRun = !args.isNullAt(2) && args.getBoolean(2);
         if (dryRun) {
-            OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
-        }
-
-        int availableProcessors = Runtime.getRuntime().availableProcessors();
-        ExecutorService executePool =
-                new ThreadPoolExecutor(
-                        availableProcessors,
-                        availableProcessors,
-                        1,
-                        TimeUnit.SECONDS,
-                        new LinkedBlockingQueue<>(),
-                        new ExecutorThreadFactory(
-                                Thread.currentThread().getName() + 
"-RemoveOrphanFiles"));
-        List<Future<List<Path>>> tasks = new ArrayList<>();
-        for (Pair<String, OrphanFilesClean> tableOrphanFilesClean : 
tableOrphanFilesCleans) {
-            String tableName = tableOrphanFilesClean.getLeft();
-            OrphanFilesClean orphanFilesClean = 
tableOrphanFilesClean.getRight();
-            Future<List<Path>> task =
-                    executePool.submit(
-                            () ->
-                                    modifyPaimonTable(
-                                            toIdentifier(tableName, tableName),
-                                            table -> {
-                                                checkArgument(table instanceof 
FileStoreTable);
-                                                try {
-                                                    return 
orphanFilesClean.clean();
-                                                } catch (Exception e) {
-                                                    throw new RuntimeException(
-                                                            "Call 
remove_orphan_files error", e);
-                                                }
-                                            }));
-            tasks.add(task);
+            tableCleans.forEach(clean -> clean.fileCleaner(path -> {}));
         }
 
-        List<Path> cleanOrphanFiles = new ArrayList<>();
-        for (Future<List<Path>> task : tasks) {
-            try {
-                cleanOrphanFiles.addAll(task.get());
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        executePool.shutdownNow();
-
-        List<InternalRow> showLimitedDeletedFiles = new 
ArrayList<>(cleanOrphanFiles.size());
-        OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
-                .forEach(
-                        deletedFile ->
-                                showLimitedDeletedFiles.add(
-                                        
newInternalRow(UTF8String.fromString(deletedFile))));
+        String[] result = executeOrphanFilesClean(tableCleans);
+        List<InternalRow> rows = new ArrayList<>();
+        Arrays.stream(result)
+                .forEach(line -> 
rows.add(newInternalRow(UTF8String.fromString(line))));
 
-        return showLimitedDeletedFiles.toArray(new InternalRow[0]);
+        return rows.toArray(new InternalRow[0]);
     }
 
     public static ProcedureBuilder builder() {

Reply via email to