This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 126da62ae [core] Adjust database OrphanFilesClean
126da62ae is described below
commit 126da62ae3e82769196f0585b43767f1ee37d84a
Author: Jingsong <[email protected]>
AuthorDate: Fri Jul 5 21:48:32 2024 +0800
[core] Adjust database OrphanFilesClean
---
.../apache/paimon/operation/OrphanFilesClean.java | 41 +++++++----
.../flink/action/RemoveOrphanFilesAction.java | 64 ++----------------
.../procedure/RemoveOrphanFilesProcedure.java | 13 ++--
.../procedure/RemoveOrphanFilesProcedure.java | 79 ++++------------------
4 files changed, 54 insertions(+), 143 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index d725f3360..b0b06c33d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -37,7 +37,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileUtils;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -57,6 +56,9 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -87,7 +89,7 @@ public class OrphanFilesClean {
private static final int READ_FILE_RETRY_NUM = 3;
private static final int READ_FILE_RETRY_INTERVAL = 5;
- public static final int SHOW_LIMIT = 200;
+ private static final int SHOW_LIMIT = 200;
private final SnapshotManager snapshotManager;
private final TagManager tagManager;
@@ -564,10 +566,10 @@ public class OrphanFilesClean {
return result;
}
- public static List<Pair<String, OrphanFilesClean>>
constructOrphanFilesCleans(
+ public static List<OrphanFilesClean> createOrphanFilesCleans(
Catalog catalog, String databaseName, @Nullable String tableName)
throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
- List<Pair<String, OrphanFilesClean>> orphanFilesCleans = new
ArrayList<>();
+ List<OrphanFilesClean> orphanFilesCleans = new ArrayList<>();
List<String> tableNames = Collections.singletonList(tableName);
if (tableName == null || "*".equals(tableName)) {
tableNames = catalog.listTables(databaseName);
@@ -581,20 +583,33 @@ public class OrphanFilesClean {
"Only FileStoreTable supports remove-orphan-files action.
The table type is '%s'.",
table.getClass().getName());
- orphanFilesCleans.add(Pair.of(t, new
OrphanFilesClean((FileStoreTable) table)));
+ orphanFilesCleans.add(new OrphanFilesClean((FileStoreTable)
table));
}
return orphanFilesCleans;
}
- public static void initOlderThan(
- String olderThan, List<Pair<String, OrphanFilesClean>>
tableOrphanFilesCleans) {
- tableOrphanFilesCleans.forEach(
- orphanFilesClean ->
orphanFilesClean.getRight().olderThan(olderThan));
- }
+ public static String[] executeOrphanFilesClean(List<OrphanFilesClean>
tableCleans) {
+ ExecutorService executorService =
+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ List<Future<List<Path>>> tasks = new ArrayList<>();
+ for (OrphanFilesClean clean : tableCleans) {
+ tasks.add(executorService.submit(clean::clean));
+ }
+
+ List<Path> cleanOrphanFiles = new ArrayList<>();
+ for (Future<List<Path>> task : tasks) {
+ try {
+ cleanOrphanFiles.addAll(task.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
- public static void initDryRun(List<Pair<String, OrphanFilesClean>>
tableOrphanFilesCleans) {
- tableOrphanFilesCleans.forEach(
- orphanFilesClean ->
orphanFilesClean.getRight().fileCleaner(path -> {}));
+ executorService.shutdownNow();
+ return showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT).toArray(new
String[0]);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
index df0101deb..c95db99d5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
@@ -19,29 +19,19 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.Pair;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
+import static
org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
/** Action to remove the orphan data files and metadata files. */
public class RemoveOrphanFilesAction extends ActionBase {
- private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
+ private final List<OrphanFilesClean> tableCleans;
public RemoveOrphanFilesAction(
String warehouse,
@@ -50,60 +40,20 @@ public class RemoveOrphanFilesAction extends ActionBase {
Map<String, String> catalogConfig)
throws Catalog.TableNotExistException,
Catalog.DatabaseNotExistException {
super(warehouse, catalogConfig);
- this.tableOrphanFilesCleans =
- OrphanFilesClean.constructOrphanFilesCleans(catalog,
databaseName, tableName);
+ this.tableCleans =
+ OrphanFilesClean.createOrphanFilesCleans(catalog,
databaseName, tableName);
}
public void olderThan(String olderThan) {
- OrphanFilesClean.initOlderThan(olderThan, this.tableOrphanFilesCleans);
+ tableCleans.forEach(clean -> clean.olderThan(olderThan));
}
public void dryRun() {
- OrphanFilesClean.initDryRun(this.tableOrphanFilesCleans);
- }
-
- public static String[] executeOrphanFilesClean(
- List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
- throws ExecutionException, InterruptedException {
- int availableProcessors = Runtime.getRuntime().availableProcessors();
- ExecutorService executePool =
- new ThreadPoolExecutor(
- availableProcessors,
- availableProcessors,
- 1,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new ExecutorThreadFactory(
- Thread.currentThread().getName() +
"-RemoveOrphanFiles"));
-
- List<Future<List<Path>>> tasks = new ArrayList<>();
- for (Pair<String, OrphanFilesClean> tableOrphanFilesClean :
tableOrphanFilesCleans) {
- OrphanFilesClean orphanFilesClean =
tableOrphanFilesClean.getRight();
- Future<List<Path>> task =
- executePool.submit(
- () -> {
- try {
- return orphanFilesClean.clean();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- tasks.add(task);
- }
-
- List<Path> cleanOrphanFiles = new ArrayList<>();
- for (Future<List<Path>> task : tasks) {
- cleanOrphanFiles.addAll(task.get());
- }
-
- executePool.shutdownNow();
-
- return OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
- .toArray(new String[0]);
+ tableCleans.forEach(clean -> clean.fileCleaner(path -> {}));
}
@Override
public void run() throws Exception {
- executeOrphanFilesClean(tableOrphanFilesCleans);
+ executeOrphanFilesClean(tableCleans);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 5c7c75850..d43056f97 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,14 +20,13 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;
import java.util.List;
-import static
org.apache.paimon.flink.action.RemoveOrphanFilesAction.executeOrphanFilesClean;
+import static
org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
/**
* Remove orphan files procedure. Usage:
@@ -63,18 +62,18 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
- List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans =
- OrphanFilesClean.constructOrphanFilesCleans(catalog,
databaseName, tableName);
+ List<OrphanFilesClean> tableCleans =
+ OrphanFilesClean.createOrphanFilesCleans(catalog,
databaseName, tableName);
if (!StringUtils.isBlank(olderThan)) {
- OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
+ tableCleans.forEach(clean -> clean.olderThan(olderThan));
}
if (dryRun) {
- OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
+ tableCleans.forEach(clean -> clean.fileCleaner(path -> {}));
}
- return executeOrphanFilesClean(tableOrphanFilesCleans);
+ return executeOrphanFilesClean(tableCleans);
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index 6c1ad2405..af973f2a5 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -18,12 +18,8 @@
package org.apache.paimon.spark.procedure;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.spark.catalog.WithPaimonCatalog;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -37,15 +33,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+import static
org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -108,10 +99,10 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
}
LOG.info("identifier is {}.", identifier);
- List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
+ List<OrphanFilesClean> tableCleans;
try {
- tableOrphanFilesCleans =
- OrphanFilesClean.constructOrphanFilesCleans(
+ tableCleans =
+ OrphanFilesClean.createOrphanFilesCleans(
((WithPaimonCatalog)
tableCatalog()).paimonCatalog(),
identifier.getDatabaseName(),
identifier.getObjectName());
@@ -121,64 +112,20 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
String olderThan = args.isNullAt(1) ? null : args.getString(1);
if (!StringUtils.isBlank(olderThan)) {
- OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
+ tableCleans.forEach(clean -> clean.olderThan(olderThan));
}
boolean dryRun = !args.isNullAt(2) && args.getBoolean(2);
if (dryRun) {
- OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
- }
-
- int availableProcessors = Runtime.getRuntime().availableProcessors();
- ExecutorService executePool =
- new ThreadPoolExecutor(
- availableProcessors,
- availableProcessors,
- 1,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new ExecutorThreadFactory(
- Thread.currentThread().getName() +
"-RemoveOrphanFiles"));
- List<Future<List<Path>>> tasks = new ArrayList<>();
- for (Pair<String, OrphanFilesClean> tableOrphanFilesClean :
tableOrphanFilesCleans) {
- String tableName = tableOrphanFilesClean.getLeft();
- OrphanFilesClean orphanFilesClean =
tableOrphanFilesClean.getRight();
- Future<List<Path>> task =
- executePool.submit(
- () ->
- modifyPaimonTable(
- toIdentifier(tableName, tableName),
- table -> {
- checkArgument(table instanceof
FileStoreTable);
- try {
- return
orphanFilesClean.clean();
- } catch (Exception e) {
- throw new RuntimeException(
- "Call
remove_orphan_files error", e);
- }
- }));
- tasks.add(task);
+ tableCleans.forEach(clean -> clean.fileCleaner(path -> {}));
}
- List<Path> cleanOrphanFiles = new ArrayList<>();
- for (Future<List<Path>> task : tasks) {
- try {
- cleanOrphanFiles.addAll(task.get());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- executePool.shutdownNow();
-
- List<InternalRow> showLimitedDeletedFiles = new
ArrayList<>(cleanOrphanFiles.size());
- OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
- .forEach(
- deletedFile ->
- showLimitedDeletedFiles.add(
-
newInternalRow(UTF8String.fromString(deletedFile))));
+ String[] result = executeOrphanFilesClean(tableCleans);
+ List<InternalRow> rows = new ArrayList<>();
+ Arrays.stream(result)
+ .forEach(line ->
rows.add(newInternalRow(UTF8String.fromString(line))));
- return showLimitedDeletedFiles.toArray(new InternalRow[0]);
+ return rows.toArray(new InternalRow[0]);
}
public static ProcedureBuilder builder() {