JingsongLi commented on code in PR #3671:
URL: https://github.com/apache/paimon/pull/3671#discussion_r1666326397
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -18,51 +18,121 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ScanParallelExecutor;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Action to remove the orphan data files and metadata files. */
-public class RemoveOrphanFilesAction extends TableActionBase {
+public class RemoveOrphanFilesAction extends ActionBase {
+
private static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
- private final OrphanFilesClean orphanFilesClean;
+ private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
public RemoveOrphanFilesAction(
String warehouse,
String databaseName,
String tableName,
- Map<String, String> catalogConfig) {
- super(warehouse, databaseName, tableName, catalogConfig);
+ Map<String, String> catalogConfig)
+ throws Catalog.TableNotExistException,
Catalog.DatabaseNotExistException {
+ super(warehouse, catalogConfig);
+ this.tableOrphanFilesCleans = new ArrayList<>();
+ constructOrphanFilesCleans(catalog, databaseName, tableName,
tableOrphanFilesCleans);
+ }
+ public static void constructOrphanFilesCleans(
+ Catalog catalog,
+ String databaseName,
+ String tableName,
+ List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
+ throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
+ if ("*".equals(tableName)) {
+ for (String t : catalog.listTables(databaseName)) {
Review Comment:
Here can just create a `List<String> tables`, when tableName is not null,
tables is `singletonList(tableName)`.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -18,51 +18,121 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ScanParallelExecutor;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Action to remove the orphan data files and metadata files. */
-public class RemoveOrphanFilesAction extends TableActionBase {
+public class RemoveOrphanFilesAction extends ActionBase {
+
private static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
- private final OrphanFilesClean orphanFilesClean;
+ private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
public RemoveOrphanFilesAction(
String warehouse,
String databaseName,
String tableName,
- Map<String, String> catalogConfig) {
- super(warehouse, databaseName, tableName, catalogConfig);
+ Map<String, String> catalogConfig)
+ throws Catalog.TableNotExistException,
Catalog.DatabaseNotExistException {
+ super(warehouse, catalogConfig);
+ this.tableOrphanFilesCleans = new ArrayList<>();
+ constructOrphanFilesCleans(catalog, databaseName, tableName,
tableOrphanFilesCleans);
+ }
+ public static void constructOrphanFilesCleans(
+ Catalog catalog,
+ String databaseName,
+ String tableName,
+ List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
+ throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
+ if ("*".equals(tableName)) {
+ for (String t : catalog.listTables(databaseName)) {
+ constructOrphanFilesCleanInternal(catalog, databaseName, t,
tableOrphanFilesCleans);
+ }
+ } else {
+ constructOrphanFilesCleanInternal(
+ catalog, databaseName, tableName, tableOrphanFilesCleans);
+ }
+ }
+
+ public static void constructOrphanFilesCleanInternal(
Review Comment:
just inline this method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]