This is an automated email from the ASF dual-hosted git repository. czy006 pushed a commit to branch 0.9.x in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 1b0faed36863b2b6b04808e6cbcb52fb1ef9380b Author: WenLingzhang <[email protected]> AuthorDate: Tue May 19 10:24:53 2026 +0800 [AMORO-4216] Refactor dangling-delete-files-cleaning via ProcessFactory plugin (#4214) * Refactor dangling-delete-files-cleaning via ProcessFactory plugin # Conflicts: # amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java # amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java # amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java * Fix IcebergActions init failure by increasing MAX_NAME_LENGTH to 32 * rename action to clean-dangling-delete-files to fix pool tag mismatch * Fix: LocalProcess exceptions were swallowed, preventing state from being updated to FAILED Problem Analysis: - In DanglingDeleteFilesCleaningProcess, SnapshotsExpiringProcess, and OrphanFilesCleaningProcess, exceptions in the run() method were caught but not re-thrown - This caused LocalExecutionEngine.ProcessHolder.onComplete() to never detect failures - Process status was always set to SUCCESS even when execution actually failed Fix: - Add in the catch block to re-throw exceptions - Keep existing logging for troubleshooting - Ensure exceptions properly propagate to ProcessHolder so status is correctly updated to FAILED Modified Files: - amoro-ams/src/main/java/.../DanglingDeleteFilesCleaningProcess.java - amoro-ams/src/main/java/.../SnapshotsExpiringProcess.java - amoro-ams/src/main/java/.../OrphanFilesCleaningProcess.java * Fix logging message for dangling delete files error * Fix formatting of class documentation comment * fixup style --------- Co-authored-by: 张文领 <[email protected]> (cherry picked from commit 0c5d3d84c6ba544adb10bdd4d4893de489e3dd4b) --- .../apache/amoro/server/AmoroManagementConf.java | 18 ----- .../apache/amoro/server/AmoroServiceContainer.java | 1 - ...ava => DanglingDeleteFilesCleaningProcess.java} | 21 +++-- .../process/iceberg/IcebergProcessFactory.java | 34 ++++++++ .../iceberg/OrphanFilesCleaningProcess.java | 1 + .../process/iceberg/SnapshotsExpiringProcess.java | 3 +- .../DanglingDeleteFilesCleaningExecutor.java | 92 ---------------------- .../scheduler/inline/InlineTableExecutors.java | 12 --- .../amoro/server/table/DefaultTableRuntime.java | 5 -- .../server/table/cleanup/CleanupOperation.java | 1 - .../table/cleanup/TableRuntimeCleanupState.java | 4 +- .../process/iceberg/TestIcebergProcessFactory.java | 18 +++++ .../inline/PeriodicTableSchedulerTestBase.java | 3 - .../inline/TestConfigurableIntervalExecutors.java | 39 +-------- .../inline/TestPeriodicTableSchedulerCleanup.java | 15 +--- .../src/main/java/org/apache/amoro/Action.java | 2 +- .../main/java/org/apache/amoro/IcebergActions.java | 1 + .../amoro/process/TestLocalExecutionEngine.java | 2 + charts/amoro/templates/amoro-configmap.yaml | 4 - dist/src/main/amoro-bin/conf/config.yaml | 4 - .../amoro-bin/conf/plugins/execute-engines.yaml | 1 + .../amoro-bin/conf/plugins/process-factories.yaml | 2 + docs/admin-guides/deployment.md | 3 + docs/configuration/ams-config.md | 3 - 24 files changed, 85 insertions(+), 204 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 49eade550..759b76700 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -174,24 +174,6 @@ public class AmoroManagementConf { .defaultValue(Duration.ofHours(1)) .withDescription("Interval for expiring snapshots."); - public static final ConfigOption<Boolean> CLEAN_DANGLING_DELETE_FILES_ENABLED = - ConfigOptions.key("clean-dangling-delete-files.enabled") - .booleanType() - .defaultValue(true) - .withDescription("Enable dangling delete files cleaning."); - - public static final ConfigOption<Integer> CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT = - ConfigOptions.key("clean-dangling-delete-files.thread-count") - .intType() - .defaultValue(10) - .withDescription("The number of threads used for dangling delete files cleaning."); - - public static final ConfigOption<Duration> CLEAN_DANGLING_DELETE_FILES_INTERVAL = - ConfigOptions.key("clean-dangling-delete-files.interval") - .durationType() - .defaultValue(Duration.ofDays(1)) - .withDescription("Interval for cleaning dangling delete files."); - public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED = ConfigOptions.key("sync-hive-tables.enabled") .booleanType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index d174e559e..27615ab31 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -290,7 +290,6 @@ public class AmoroServiceContainer { addHandlerChain(optimizingService.getTableRuntimeHandler()); addHandlerChain(processService.getTableHandlerChain()); addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java similarity index 74% copy from amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java copy to amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java index 22e4c05ee..5c5fef19c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java @@ -34,12 +34,13 @@ import org.slf4j.LoggerFactory; import java.util.Map; -/** Local table process for cleaning Iceberg orphan files. */ -public class OrphanFilesCleaningProcess extends TableProcess implements LocalProcess { +/** Local table process for expiring Iceberg dangling delete files. */ +public class DanglingDeleteFilesCleaningProcess extends TableProcess implements LocalProcess { - private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleaningProcess.class); + private static final Logger LOG = + LoggerFactory.getLogger(DanglingDeleteFilesCleaningProcess.class); - public OrphanFilesCleaningProcess(TableRuntime tableRuntime, ExecuteEngine engine) { + public DanglingDeleteFilesCleaningProcess(TableRuntime tableRuntime, ExecuteEngine engine) { super(tableRuntime, engine); } @@ -53,18 +54,22 @@ public class OrphanFilesCleaningProcess extends TableProcess implements LocalPro try { AmoroTable<?> amoroTable = tableRuntime.loadTable(); TableMaintainer tableMaintainer = TableMaintainerFactory.create(amoroTable, tableRuntime); - tableMaintainer.cleanOrphanFiles(); + tableMaintainer.cleanDanglingDeleteFiles(); tableRuntime.updateState( DefaultTableRuntime.CLEANUP_STATE_KEY, - cleanUp -> cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis())); + cleanUp -> cleanUp.setLastDanglingDeleteFilesCleanTime(System.currentTimeMillis())); } catch (Throwable t) { - LOG.error("Failed to clean orphan files for table {}", tableRuntime.getTableIdentifier(), t); + LOG.error( + "unexpected dangling delete files cleaning error of table {}", + tableRuntime.getTableIdentifier(), + t); + throw new RuntimeException(t); } } @Override public Action getAction() { - return IcebergActions.DELETE_ORPHANS; + return IcebergActions.CLEAN_DANGLING_DELETE; } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index 1f7813476..813892595 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -65,6 +65,14 @@ public class IcebergProcessFactory implements ProcessFactory { .durationType() .defaultValue(Duration.ofDays(1)); + public static final ConfigOption<Boolean> DANGLING_DELETE_FILES_CLEANING_ENABLED = + ConfigOptions.key("clean-dangling-delete-files.enabled").booleanType().defaultValue(true); + + public static final ConfigOption<Duration> DANGLING_DELETE_FILES_CLEANING_INTERVAL = + ConfigOptions.key("clean-dangling-delete-files.interval") + .durationType() + .defaultValue(Duration.ofDays(1)); + private ExecuteEngine localEngine; private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap(); private final List<TableFormat> formats = @@ -101,6 +109,8 @@ public class IcebergProcessFactory implements ProcessFactory { return triggerExpireSnapshot(tableRuntime); } else if (IcebergActions.DELETE_ORPHANS.equals(action)) { return triggerCleanOrphans(tableRuntime); + } else if (IcebergActions.CLEAN_DANGLING_DELETE.equals(action)) { + return triggerCleanDanglingDelete(tableRuntime); } return Optional.empty(); @@ -130,6 +140,12 @@ public class IcebergProcessFactory implements ProcessFactory { this.actions.put( IcebergActions.DELETE_ORPHANS, ProcessTriggerStrategy.triggerAtFixRate(interval)); } + + if (configs.getBoolean(DANGLING_DELETE_FILES_CLEANING_ENABLED)) { + Duration interval = configs.getDuration(DANGLING_DELETE_FILES_CLEANING_INTERVAL); + this.actions.put( + IcebergActions.CLEAN_DANGLING_DELETE, ProcessTriggerStrategy.triggerAtFixRate(interval)); + } } private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) { @@ -162,6 +178,24 @@ public class IcebergProcessFactory implements ProcessFactory { return Optional.of(new OrphanFilesCleaningProcess(tableRuntime, localEngine)); } + private Optional<TableProcess> triggerCleanDanglingDelete(TableRuntime tableRuntime) { + if (localEngine == null + || !tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled()) { + return Optional.empty(); + } + + long lastExecuteTime = + tableRuntime + .getState(DefaultTableRuntime.CLEANUP_STATE_KEY) + .getLastDanglingDeleteFilesCleanTime(); + ProcessTriggerStrategy strategy = actions.get(IcebergActions.CLEAN_DANGLING_DELETE); + if (System.currentTimeMillis() - lastExecuteTime < strategy.getTriggerInterval().toMillis()) { + return Optional.empty(); + } + + return Optional.of(new DanglingDeleteFilesCleaningProcess(tableRuntime, localEngine)); + } + @Override public void close() {} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java index 22e4c05ee..b17f44f2c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java @@ -59,6 +59,7 @@ public class OrphanFilesCleaningProcess extends TableProcess implements LocalPro cleanUp -> cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis())); } catch (Throwable t) { LOG.error("Failed to clean orphan files for table {}", tableRuntime.getTableIdentifier(), t); + throw new RuntimeException(t); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java index 3aea51d9f..e44e8b2ca 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java @@ -58,7 +58,8 @@ public class SnapshotsExpiringProcess extends TableProcess implements LocalProce DefaultTableRuntime.CLEANUP_STATE_KEY, cleanUp -> cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis())); } catch (Throwable t) { - LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t); + LOG.error("unexpected expire error of table {}", tableRuntime.getTableIdentifier(), t); + throw new RuntimeException(t); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java deleted file mode 100644 index 3ad669092..000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.scheduler.inline; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.config.TableConfiguration; -import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; -import org.apache.amoro.server.scheduler.PeriodicTableScheduler; -import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.server.table.TableService; -import org.apache.amoro.server.table.cleanup.CleanupOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - -/** Clean table dangling delete files */ -public class DanglingDeleteFilesCleaningExecutor extends PeriodicTableScheduler { - - private static final Logger LOG = - LoggerFactory.getLogger(DanglingDeleteFilesCleaningExecutor.class); - - private final Duration interval; - - protected DanglingDeleteFilesCleaningExecutor( - TableService tableService, int poolSize, Duration interval) { - super(tableService, poolSize); - this.interval = interval; - } - - @Override - protected long getNextExecutingTime(TableRuntime tableRuntime) { - return interval.toMillis(); - } - - @Override - protected boolean shouldExecute(Long lastCleanupEndTime) { - return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis(); - } - - @Override - protected CleanupOperation getCleanupOperation() { - return CleanupOperation.DANGLING_DELETE_FILES_CLEANING; - } - - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return tableRuntime instanceof DefaultTableRuntime - && tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled(); - } - - @Override - public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - scheduleIfNecessary(tableRuntime, getStartDelay()); - } - - @Override - protected long getExecutorDelay() { - return ThreadLocalRandom.current().nextLong(interval.toMillis()); - } - - @Override - protected void execute(TableRuntime tableRuntime) { - try { - LOG.info("{} start cleaning dangling delete files", tableRuntime.getTableIdentifier()); - AmoroTable<?> amoroTable = loadTable(tableRuntime); - TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, tableRuntime); - tableMaintainer.cleanDanglingDeleteFiles(); - } catch (Throwable t) { - LOG.error("{} failed to clean dangling delete file", tableRuntime.getTableIdentifier(), t); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index 4e5b4beb3..4bb45f73a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -28,7 +28,6 @@ public class InlineTableExecutors { private static final InlineTableExecutors instance = new InlineTableExecutors(); private TableRuntimeRefreshExecutor tableRefreshingExecutor; - private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor; private BlockerExpiringExecutor blockerExpiringExecutor; private OptimizingCommitExecutor optimizingCommitExecutor; private ProcessDataExpiringExecutor processDataExpiringExecutor; @@ -41,13 +40,6 @@ public class InlineTableExecutors { } public void setup(TableService tableService, Configurations conf) { - if (conf.getBoolean(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) { - this.danglingDeleteFilesCleaningExecutor = - new DanglingDeleteFilesCleaningExecutor( - tableService, - conf.getInteger(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT), - conf.get(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_INTERVAL)); - } this.optimizingCommitExecutor = new OptimizingCommitExecutor( tableService, conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT)); @@ -100,10 +92,6 @@ public class InlineTableExecutors { return tableRefreshingExecutor; } - public DanglingDeleteFilesCleaningExecutor getDanglingDeleteFilesCleaningExecutor() { - return danglingDeleteFilesCleaningExecutor; - } - public BlockerExpiringExecutor getBlockerExpiringExecutor() { return blockerExpiringExecutor; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index f4a4ef73e..1c6dd25f8 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -342,8 +342,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime { public long getLastCleanTime(CleanupOperation operation) { TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY); switch (operation) { - case DANGLING_DELETE_FILES_CLEANING: - return state.getLastDanglingDeleteFilesCleanTime(); case DATA_EXPIRING: return state.getLastDataExpiringTime(); case SNAPSHOTS_EXPIRING: @@ -360,9 +358,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime { CLEANUP_STATE_KEY, state -> { switch (operation) { - case DANGLING_DELETE_FILES_CLEANING: - state.setLastDanglingDeleteFilesCleanTime(time); - break; case DATA_EXPIRING: state.setLastDataExpiringTime(time); break; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java index b6597db82..478d607f5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java @@ -20,7 +20,6 @@ package org.apache.amoro.server.table.cleanup; /** Table cleanup operation enum. Defines different operation types for table cleanup tasks. */ public enum CleanupOperation { - DANGLING_DELETE_FILES_CLEANING, DATA_EXPIRING, SNAPSHOTS_EXPIRING, // NONE indicates operation types where no cleanup process records are diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java index 639506ea9..21f112584 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java @@ -37,8 +37,10 @@ public class TableRuntimeCleanupState { return lastDanglingDeleteFilesCleanTime; } - public void setLastDanglingDeleteFilesCleanTime(long lastDanglingDeleteFilesCleanTime) { + public TableRuntimeCleanupState setLastDanglingDeleteFilesCleanTime( + long lastDanglingDeleteFilesCleanTime) { this.lastDanglingDeleteFilesCleanTime = lastDanglingDeleteFilesCleanTime; + return this; } public long getLastDataExpiringTime() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java index 61b00b833..825410296 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -46,6 +46,8 @@ public class TestIcebergProcessFactory { assertSupportedAction("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, Duration.ofHours(1)); assertSupportedAction( "clean-orphan-files", IcebergActions.DELETE_ORPHANS, Duration.ofHours(24)); + assertSupportedAction( + "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, Duration.ofHours(24)); } @Test @@ -54,6 +56,11 @@ public class TestIcebergProcessFactory { "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, SnapshotsExpiringProcess.class, 0); assertTriggerWhenDue( "clean-orphan-files", IcebergActions.DELETE_ORPHANS, OrphanFilesCleaningProcess.class, 0); + assertTriggerWhenDue( + "clean-dangling-delete-files", + IcebergActions.CLEAN_DANGLING_DELETE, + DanglingDeleteFilesCleaningProcess.class, + 0); } @Test @@ -62,12 +69,18 @@ public class TestIcebergProcessFactory { "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, System.currentTimeMillis()); assertTriggerNotDue( "clean-orphan-files", IcebergActions.DELETE_ORPHANS, System.currentTimeMillis()); + assertTriggerNotDue( + "clean-dangling-delete-files", + IcebergActions.CLEAN_DANGLING_DELETE, + System.currentTimeMillis()); } @Test public void testTriggerActionDisabled() { assertTriggerDisabled("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS, false, 0); assertTriggerDisabled("clean-orphan-files", IcebergActions.DELETE_ORPHANS, false, 0); + assertTriggerDisabled( + "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, false, 0); } private void assertSupportedAction( @@ -153,6 +166,8 @@ public class TestIcebergProcessFactory { tableConfiguration.setExpireSnapshotEnabled(enabled); } else if ("clean-orphan-files".equals(configKey)) { tableConfiguration.setCleanOrphanEnabled(enabled); + } else if ("clean-dangling-delete-files".equals(configKey)) { + tableConfiguration.setDeleteDanglingDeleteFilesEnabled(enabled); } TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState(); @@ -160,11 +175,14 @@ public class TestIcebergProcessFactory { cleanupState.setLastSnapshotsExpiringTime(lastTime); } else if ("clean-orphan-files".equals(configKey)) { cleanupState.setLastOrphanFilesCleanTime(lastTime); + } else if ("clean-dangling-delete-files".equals(configKey)) { + cleanupState.setLastDanglingDeleteFilesCleanTime(lastTime); } TableRuntime runtime = mock(TableRuntime.class); doReturn(tableConfiguration).when(runtime).getTableConfiguration(); doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); + return runtime; } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java index ba2a8767c..f1c13a666 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java @@ -31,7 +31,6 @@ class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { private final CleanupOperation cleanupOperation; private final boolean enabled; private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour - private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour public PeriodicTableSchedulerTestBase( @@ -72,8 +71,6 @@ class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { switch (cleanupOperation) { case SNAPSHOTS_EXPIRING: return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL; - case DANGLING_DELETE_FILES_CLEANING: - return currentTime - lastCleanupEndTime >= DANGLING_DELETE_FILES_CLEANING_INTERVAL; case DATA_EXPIRING: return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL; default: diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java index 5b50245b6..6c8e05142 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java @@ -25,46 +25,9 @@ import org.mockito.Mockito; import java.time.Duration; -/** - * Tests for configurable interval in DanglingDeleteFilesCleaningExecutor and - * SnapshotsExpiringExecutor. - */ +/** Tests for configurable interval in SnapshotsExpiringExecutor and ProcessDataExpiringExecutor. */ public class TestConfigurableIntervalExecutors { - @Test - public void testDanglingDeleteFilesDefaultInterval() { - Duration interval = Duration.ofDays(1); - DanglingDeleteFilesCleaningExecutor executor = - new DanglingDeleteFilesCleaningExecutor(null, 1, interval); - - TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); - Assert.assertEquals(Duration.ofDays(1).toMillis(), executor.getNextExecutingTime(tableRuntime)); - } - - @Test - public void testDanglingDeleteFilesCustomInterval() { - Duration interval = Duration.ofHours(12); - DanglingDeleteFilesCleaningExecutor executor = - new DanglingDeleteFilesCleaningExecutor(null, 1, interval); - - TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); - Assert.assertEquals( - Duration.ofHours(12).toMillis(), executor.getNextExecutingTime(tableRuntime)); - } - - @Test - public void testDanglingDeleteFilesShouldExecuteAfterInterval() { - Duration interval = Duration.ofHours(6); - DanglingDeleteFilesCleaningExecutor executor = - new DanglingDeleteFilesCleaningExecutor(null, 1, interval); - - long now = System.currentTimeMillis(); - // 7 hours ago - should execute - Assert.assertTrue(executor.shouldExecute(now - Duration.ofHours(7).toMillis())); - // 5 hours ago - should not execute - Assert.assertFalse(executor.shouldExecute(now - Duration.ofHours(5).toMillis())); - } - @Test public void testSnapshotsExpiringDefaultInterval() { Duration interval = Duration.ofHours(1); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java index fef2a3bd4..7bee81602 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java @@ -166,10 +166,7 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { @Test public void testShouldExecuteTaskWithNoPreviousCleanup() { List<CleanupOperation> operations = - Arrays.asList( - CleanupOperation.DANGLING_DELETE_FILES_CLEANING, - CleanupOperation.DATA_EXPIRING, - CleanupOperation.SNAPSHOTS_EXPIRING); + Arrays.asList(CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); for (CleanupOperation operation : operations) { List<Long> testTableIds = Collections.singletonList(1L); @@ -190,10 +187,7 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { @Test public void testShouldNotExecuteTaskWithRecentCleanup() { List<CleanupOperation> operations = - Arrays.asList( - CleanupOperation.DANGLING_DELETE_FILES_CLEANING, - CleanupOperation.DATA_EXPIRING, - CleanupOperation.SNAPSHOTS_EXPIRING); + Arrays.asList(CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); for (CleanupOperation operation : operations) { List<Long> testTableIds = Collections.singletonList(1L); @@ -219,10 +213,7 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { @Test public void testShouldExecuteTaskWithOldCleanup() { List<CleanupOperation> operations = - Arrays.asList( - CleanupOperation.DANGLING_DELETE_FILES_CLEANING, - CleanupOperation.DATA_EXPIRING, - CleanupOperation.SNAPSHOTS_EXPIRING); + Arrays.asList(CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); for (CleanupOperation operation : operations) { List<Long> testTableIds = Collections.singletonList(1L); diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java b/amoro-common/src/main/java/org/apache/amoro/Action.java index 1d5f81b3b..826006a31 100644 --- a/amoro-common/src/main/java/org/apache/amoro/Action.java +++ b/amoro-common/src/main/java/org/apache/amoro/Action.java @@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; public final class Action { - private static final int MAX_NAME_LENGTH = 16; + private static final int MAX_NAME_LENGTH = 32; private static final Map<String, Action> registeredActions = new ConcurrentHashMap<>(); private final String name; diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 7b8319260..da1791e93 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -29,4 +29,5 @@ public class IcebergActions { public static final Action SYNC_HIVE = Action.register("sync-hive"); public static final Action EXPIRE_DATA = Action.register("expire-data"); public static final Action EXPIRE_SNAPSHOTS = Action.register("expire-snapshots"); + public static final Action CLEAN_DANGLING_DELETE = Action.register("clean-dangling-delete-files"); } diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java index ed1b26d53..31a9331ee 100644 --- a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java @@ -47,6 +47,7 @@ public class TestLocalExecutionEngine { public void testSubmitUsesCustomPoolByTag() throws Exception { assertCustomPoolByTag("snapshots-expiring"); assertCustomPoolByTag("orphan-files-cleaning"); + assertCustomPoolByTag("clean-dangling-delete-files"); } private void assertCustomPoolByTag(String tag) throws Exception { @@ -154,6 +155,7 @@ public class TestLocalExecutionEngine { properties.put("pool.default.thread-count", "1"); properties.put("pool.snapshots-expiring.thread-count", "1"); properties.put("pool.orphan-files-cleaning.thread-count", "1"); + properties.put("pool.clean-dangling-delete-files.thread-count", "1"); properties.put("process.status.ttl", ttl); localEngine.open(properties); return localEngine; diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index d00566f2c..b11d2bed3 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -103,10 +103,6 @@ data: enabled: true thread-count: 10 - clean-dangling-delete-files: - enabled: true - thread-count: 10 - sync-hive-tables: enabled: false thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index e5fc2654a..e4e9399f2 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -112,10 +112,6 @@ ams: enabled: true thread-count: 10 - clean-dangling-delete-files: - enabled: true - thread-count: 10 - sync-hive-tables: enabled: false thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml index 52e062629..5eefcd798 100755 --- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -23,3 +23,4 @@ execute-engines: pool.default.thread-count: 10 pool.snapshots-expiring.thread-count: 10 pool.orphan-files-cleaning.thread-count: 10 + pool.clean-dangling-delete-files.thread-count: 10 \ No newline at end of file diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml index 5825fde34..ff7719384 100755 --- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -24,3 +24,5 @@ process-factories: expire-snapshots.interval: "1h" clean-orphan-files.enabled: "true" clean-orphan-files.interval: "1d" + clean-dangling-delete-files.enabled: "true" + clean-dangling-delete-files.interval: "1d" diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md index 060923b3c..216b1a3e4 100644 --- a/docs/admin-guides/deployment.md +++ b/docs/admin-guides/deployment.md @@ -274,6 +274,8 @@ process-factories: expire-snapshots.interval: "1h" # interval for expiring snapshots clean-orphan-files.enabled: "true" # enable orphan files cleaning clean-orphan-files.interval: "1d" # interval for cleaning orphan files + clean-dangling-delete-files.enabled: "true" # enable dangling delete files cleaning + clean-dangling-delete-files.interval: "1d" # interval for cleaning dangling delete files ``` {{< hint info >}} @@ -304,6 +306,7 @@ execute-engines: pool.default.thread-count: 10 # default thread pool size pool.snapshots-expiring.thread-count: 10 # thread pool for snapshot expiration pool.orphan-files-cleaning.thread-count: 10 # thread pool for orphan file cleaning + pool.clean-dangling-delete-files.thread-count: 10 # thread pool for dangling delete files cleaning process.status.ttl: 4h # TTL for process status cache ``` diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index aac5c4c42..51428b605 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -49,9 +49,6 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | auto-create-tags.thread-count | 3 | The number of threads used for creating tags. | | blocker.timeout | 1 min | Session timeout. Default unit is milliseconds if not specified. | | catalog-meta-cache.expiration-interval | 1 min | TTL for catalog metadata. | -| clean-dangling-delete-files.enabled | true | Enable dangling delete files cleaning. | -| clean-dangling-delete-files.interval | 1 d | Interval for cleaning dangling delete files. | -| clean-dangling-delete-files.thread-count | 10 | The number of threads used for dangling delete files cleaning. | | data-expiration.enabled | true | Enable data expiration | | data-expiration.interval | 1 d | Execute interval for data expiration | | data-expiration.thread-count | 10 | The number of threads used for data expiring |
