wwj6591812 commented on code in PR #3671:
URL: https://github.com/apache/paimon/pull/3671#discussion_r1666590961
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java:
##########
@@ -37,6 +40,9 @@
*
* -- use custom file delete interval
* CALL sys.remove_orphan_files('tableId', '2023-12-31 23:59:59')
+ *
+ * -- remove all tables' orphan files in db
+ * CALL sys.remove_orphan_files('databaseName.*', '2023-12-31 23:59:59')
* </code></pre>
*/
public class RemoveOrphanFilesProcedure extends ProcedureBase {
Review Comment:
done
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -18,51 +18,121 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ScanParallelExecutor;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Action to remove the orphan data files and metadata files. */
-public class RemoveOrphanFilesAction extends TableActionBase {
+public class RemoveOrphanFilesAction extends ActionBase {
+
private static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
- private final OrphanFilesClean orphanFilesClean;
+ private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
public RemoveOrphanFilesAction(
String warehouse,
String databaseName,
String tableName,
- Map<String, String> catalogConfig) {
- super(warehouse, databaseName, tableName, catalogConfig);
+ Map<String, String> catalogConfig)
+ throws Catalog.TableNotExistException,
Catalog.DatabaseNotExistException {
+ super(warehouse, catalogConfig);
+ this.tableOrphanFilesCleans = new ArrayList<>();
+ constructOrphanFilesCleans(catalog, databaseName, tableName,
tableOrphanFilesCleans);
+ }
+ public static void constructOrphanFilesCleans(
+ Catalog catalog,
+ String databaseName,
+ String tableName,
+ List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
+ throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
+ if ("*".equals(tableName)) {
+ for (String t : catalog.listTables(databaseName)) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]