This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 0c5d3d84c [AMORO-4216] Refactor dangling-delete-files-cleaning via
ProcessFactory plugin (#4214)
0c5d3d84c is described below
commit 0c5d3d84c6ba544adb10bdd4d4893de489e3dd4b
Author: WenLingzhang <[email protected]>
AuthorDate: Tue May 19 10:24:53 2026 +0800
[AMORO-4216] Refactor dangling-delete-files-cleaning via ProcessFactory
plugin (#4214)
* Refactor dangling-delete-files-cleaning via ProcessFactory plugin
# Conflicts:
#
amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
#
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
#
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
* Fix IcebergActions init failure by increasing MAX_NAME_LENGTH to 32
* rename action to clean-dangling-delete-files to fix pool tag mismatch
* Fix: LocalProcess exceptions were swallowed, preventing state from being
updated to FAILED
Problem Analysis:
- In DanglingDeleteFilesCleaningProcess, SnapshotsExpiringProcess, and
OrphanFilesCleaningProcess, exceptions in the run() method were caught but not
re-thrown
- This caused LocalExecutionEngine.ProcessHolder.onComplete() to never
detect failures
- Process status was always set to SUCCESS even when execution actually
failed
Fix:
- Add in the catch block to re-throw exceptions
- Keep existing logging for troubleshooting
- Ensure exceptions properly propagate to ProcessHolder so status is
correctly updated to FAILED
Modified Files:
- amoro-ams/src/main/java/.../DanglingDeleteFilesCleaningProcess.java
- amoro-ams/src/main/java/.../SnapshotsExpiringProcess.java
- amoro-ams/src/main/java/.../OrphanFilesCleaningProcess.java
* Fix logging message for dangling delete files error
* Fix formatting of class documentation comment
* fixup style
---------
Co-authored-by: 张文领 <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 18 -----
.../apache/amoro/server/AmoroServiceContainer.java | 1 -
...ava => DanglingDeleteFilesCleaningProcess.java} | 21 +++--
.../process/iceberg/IcebergProcessFactory.java | 34 ++++++++
.../iceberg/OrphanFilesCleaningProcess.java | 1 +
.../process/iceberg/SnapshotsExpiringProcess.java | 3 +-
.../DanglingDeleteFilesCleaningExecutor.java | 92 ----------------------
.../scheduler/inline/InlineTableExecutors.java | 12 ---
.../amoro/server/table/DefaultTableRuntime.java | 5 --
.../server/table/cleanup/CleanupOperation.java | 1 -
.../table/cleanup/TableRuntimeCleanupState.java | 4 +-
.../process/iceberg/TestIcebergProcessFactory.java | 18 +++++
.../inline/PeriodicTableSchedulerTestBase.java | 3 -
.../inline/TestConfigurableIntervalExecutors.java | 39 +--------
.../inline/TestPeriodicTableSchedulerCleanup.java | 15 +---
.../src/main/java/org/apache/amoro/Action.java | 2 +-
.../main/java/org/apache/amoro/IcebergActions.java | 1 +
.../amoro/process/TestLocalExecutionEngine.java | 2 +
charts/amoro/templates/amoro-configmap.yaml | 4 -
dist/src/main/amoro-bin/conf/config.yaml | 4 -
.../amoro-bin/conf/plugins/execute-engines.yaml | 1 +
.../amoro-bin/conf/plugins/process-factories.yaml | 2 +
docs/admin-guides/deployment.md | 3 +
docs/configuration/ams-config.md | 3 -
24 files changed, 85 insertions(+), 204 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 49eade550..759b76700 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -174,24 +174,6 @@ public class AmoroManagementConf {
.defaultValue(Duration.ofHours(1))
.withDescription("Interval for expiring snapshots.");
- public static final ConfigOption<Boolean>
CLEAN_DANGLING_DELETE_FILES_ENABLED =
- ConfigOptions.key("clean-dangling-delete-files.enabled")
- .booleanType()
- .defaultValue(true)
- .withDescription("Enable dangling delete files cleaning.");
-
- public static final ConfigOption<Integer>
CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT =
- ConfigOptions.key("clean-dangling-delete-files.thread-count")
- .intType()
- .defaultValue(10)
- .withDescription("The number of threads used for dangling delete
files cleaning.");
-
- public static final ConfigOption<Duration>
CLEAN_DANGLING_DELETE_FILES_INTERVAL =
- ConfigOptions.key("clean-dangling-delete-files.interval")
- .durationType()
- .defaultValue(Duration.ofDays(1))
- .withDescription("Interval for cleaning dangling delete files.");
-
public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled")
.booleanType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index d174e559e..27615ab31 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -290,7 +290,6 @@ public class AmoroServiceContainer {
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(processService.getTableHandlerChain());
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
-
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java
similarity index 74%
copy from
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
copy to
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java
index 22e4c05ee..5c5fef19c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DanglingDeleteFilesCleaningProcess.java
@@ -34,12 +34,13 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-/** Local table process for cleaning Iceberg orphan files. */
-public class OrphanFilesCleaningProcess extends TableProcess implements
LocalProcess {
+/** Local table process for expiring Iceberg dangling delete files. */
+public class DanglingDeleteFilesCleaningProcess extends TableProcess
implements LocalProcess {
- private static final Logger LOG =
LoggerFactory.getLogger(OrphanFilesCleaningProcess.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DanglingDeleteFilesCleaningProcess.class);
- public OrphanFilesCleaningProcess(TableRuntime tableRuntime, ExecuteEngine
engine) {
+ public DanglingDeleteFilesCleaningProcess(TableRuntime tableRuntime,
ExecuteEngine engine) {
super(tableRuntime, engine);
}
@@ -53,18 +54,22 @@ public class OrphanFilesCleaningProcess extends
TableProcess implements LocalPro
try {
AmoroTable<?> amoroTable = tableRuntime.loadTable();
TableMaintainer tableMaintainer =
TableMaintainerFactory.create(amoroTable, tableRuntime);
- tableMaintainer.cleanOrphanFiles();
+ tableMaintainer.cleanDanglingDeleteFiles();
tableRuntime.updateState(
DefaultTableRuntime.CLEANUP_STATE_KEY,
- cleanUp ->
cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis()));
+ cleanUp ->
cleanUp.setLastDanglingDeleteFilesCleanTime(System.currentTimeMillis()));
} catch (Throwable t) {
- LOG.error("Failed to clean orphan files for table {}",
tableRuntime.getTableIdentifier(), t);
+ LOG.error(
+ "unexpected dangling delete files cleaning error of table {}",
+ tableRuntime.getTableIdentifier(),
+ t);
+ throw new RuntimeException(t);
}
}
@Override
public Action getAction() {
- return IcebergActions.DELETE_ORPHANS;
+ return IcebergActions.CLEAN_DANGLING_DELETE;
}
@Override
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 1f7813476..813892595 100755
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -65,6 +65,14 @@ public class IcebergProcessFactory implements ProcessFactory
{
.durationType()
.defaultValue(Duration.ofDays(1));
+ public static final ConfigOption<Boolean>
DANGLING_DELETE_FILES_CLEANING_ENABLED =
+
ConfigOptions.key("clean-dangling-delete-files.enabled").booleanType().defaultValue(true);
+
+ public static final ConfigOption<Duration>
DANGLING_DELETE_FILES_CLEANING_INTERVAL =
+ ConfigOptions.key("clean-dangling-delete-files.interval")
+ .durationType()
+ .defaultValue(Duration.ofDays(1));
+
private ExecuteEngine localEngine;
private final Map<Action, ProcessTriggerStrategy> actions =
Maps.newHashMap();
private final List<TableFormat> formats =
@@ -101,6 +109,8 @@ public class IcebergProcessFactory implements
ProcessFactory {
return triggerExpireSnapshot(tableRuntime);
} else if (IcebergActions.DELETE_ORPHANS.equals(action)) {
return triggerCleanOrphans(tableRuntime);
+ } else if (IcebergActions.CLEAN_DANGLING_DELETE.equals(action)) {
+ return triggerCleanDanglingDelete(tableRuntime);
}
return Optional.empty();
@@ -130,6 +140,12 @@ public class IcebergProcessFactory implements
ProcessFactory {
this.actions.put(
IcebergActions.DELETE_ORPHANS,
ProcessTriggerStrategy.triggerAtFixRate(interval));
}
+
+ if (configs.getBoolean(DANGLING_DELETE_FILES_CLEANING_ENABLED)) {
+ Duration interval =
configs.getDuration(DANGLING_DELETE_FILES_CLEANING_INTERVAL);
+ this.actions.put(
+ IcebergActions.CLEAN_DANGLING_DELETE,
ProcessTriggerStrategy.triggerAtFixRate(interval));
+ }
}
private Optional<TableProcess> triggerExpireSnapshot(TableRuntime
tableRuntime) {
@@ -162,6 +178,24 @@ public class IcebergProcessFactory implements
ProcessFactory {
return Optional.of(new OrphanFilesCleaningProcess(tableRuntime,
localEngine));
}
+ private Optional<TableProcess> triggerCleanDanglingDelete(TableRuntime
tableRuntime) {
+ if (localEngine == null
+ ||
!tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled()) {
+ return Optional.empty();
+ }
+
+ long lastExecuteTime =
+ tableRuntime
+ .getState(DefaultTableRuntime.CLEANUP_STATE_KEY)
+ .getLastDanglingDeleteFilesCleanTime();
+ ProcessTriggerStrategy strategy =
actions.get(IcebergActions.CLEAN_DANGLING_DELETE);
+ if (System.currentTimeMillis() - lastExecuteTime <
strategy.getTriggerInterval().toMillis()) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new DanglingDeleteFilesCleaningProcess(tableRuntime,
localEngine));
+ }
+
@Override
public void close() {}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
index 22e4c05ee..b17f44f2c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
@@ -59,6 +59,7 @@ public class OrphanFilesCleaningProcess extends TableProcess
implements LocalPro
cleanUp ->
cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis()));
} catch (Throwable t) {
LOG.error("Failed to clean orphan files for table {}",
tableRuntime.getTableIdentifier(), t);
+ throw new RuntimeException(t);
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
index 3aea51d9f..e44e8b2ca 100755
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
@@ -58,7 +58,8 @@ public class SnapshotsExpiringProcess extends TableProcess
implements LocalProce
DefaultTableRuntime.CLEANUP_STATE_KEY,
cleanUp ->
cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis()));
} catch (Throwable t) {
- LOG.error("unexpected expire error of table {} ",
tableRuntime.getTableIdentifier(), t);
+ LOG.error("unexpected expire error of table {}",
tableRuntime.getTableIdentifier(), t);
+ throw new RuntimeException(t);
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
deleted file mode 100644
index 3ad669092..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.AmoroTable;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.maintainer.TableMaintainer;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.concurrent.ThreadLocalRandom;
-
-/** Clean table dangling delete files */
-public class DanglingDeleteFilesCleaningExecutor extends
PeriodicTableScheduler {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(DanglingDeleteFilesCleaningExecutor.class);
-
- private final Duration interval;
-
- protected DanglingDeleteFilesCleaningExecutor(
- TableService tableService, int poolSize, Duration interval) {
- super(tableService, poolSize);
- this.interval = interval;
- }
-
- @Override
- protected long getNextExecutingTime(TableRuntime tableRuntime) {
- return interval.toMillis();
- }
-
- @Override
- protected boolean shouldExecute(Long lastCleanupEndTime) {
- return System.currentTimeMillis() - lastCleanupEndTime >=
interval.toMillis();
- }
-
- @Override
- protected CleanupOperation getCleanupOperation() {
- return CleanupOperation.DANGLING_DELETE_FILES_CLEANING;
- }
-
- @Override
- protected boolean enabled(TableRuntime tableRuntime) {
- return tableRuntime instanceof DefaultTableRuntime
- &&
tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
- }
-
- @Override
- public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
- scheduleIfNecessary(tableRuntime, getStartDelay());
- }
-
- @Override
- protected long getExecutorDelay() {
- return ThreadLocalRandom.current().nextLong(interval.toMillis());
- }
-
- @Override
- protected void execute(TableRuntime tableRuntime) {
- try {
- LOG.info("{} start cleaning dangling delete files",
tableRuntime.getTableIdentifier());
- AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
- tableMaintainer.cleanDanglingDeleteFiles();
- } catch (Throwable t) {
- LOG.error("{} failed to clean dangling delete file",
tableRuntime.getTableIdentifier(), t);
- }
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 4e5b4beb3..4bb45f73a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -28,7 +28,6 @@ public class InlineTableExecutors {
private static final InlineTableExecutors instance = new
InlineTableExecutors();
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
- private DanglingDeleteFilesCleaningExecutor
danglingDeleteFilesCleaningExecutor;
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
private ProcessDataExpiringExecutor processDataExpiringExecutor;
@@ -41,13 +40,6 @@ public class InlineTableExecutors {
}
public void setup(TableService tableService, Configurations conf) {
- if
(conf.getBoolean(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) {
- this.danglingDeleteFilesCleaningExecutor =
- new DanglingDeleteFilesCleaningExecutor(
- tableService,
-
conf.getInteger(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT),
-
conf.get(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_INTERVAL));
- }
this.optimizingCommitExecutor =
new OptimizingCommitExecutor(
tableService,
conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT));
@@ -100,10 +92,6 @@ public class InlineTableExecutors {
return tableRefreshingExecutor;
}
- public DanglingDeleteFilesCleaningExecutor
getDanglingDeleteFilesCleaningExecutor() {
- return danglingDeleteFilesCleaningExecutor;
- }
-
public BlockerExpiringExecutor getBlockerExpiringExecutor() {
return blockerExpiringExecutor;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index f4a4ef73e..1c6dd25f8 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -342,8 +342,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
public long getLastCleanTime(CleanupOperation operation) {
TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
switch (operation) {
- case DANGLING_DELETE_FILES_CLEANING:
- return state.getLastDanglingDeleteFilesCleanTime();
case DATA_EXPIRING:
return state.getLastDataExpiringTime();
case SNAPSHOTS_EXPIRING:
@@ -360,9 +358,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
CLEANUP_STATE_KEY,
state -> {
switch (operation) {
- case DANGLING_DELETE_FILES_CLEANING:
- state.setLastDanglingDeleteFilesCleanTime(time);
- break;
case DATA_EXPIRING:
state.setLastDataExpiringTime(time);
break;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
index b6597db82..478d607f5 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
@@ -20,7 +20,6 @@ package org.apache.amoro.server.table.cleanup;
/** Table cleanup operation enum. Defines different operation types for table
cleanup tasks. */
public enum CleanupOperation {
- DANGLING_DELETE_FILES_CLEANING,
DATA_EXPIRING,
SNAPSHOTS_EXPIRING,
// NONE indicates operation types where no cleanup process records are
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
index 639506ea9..21f112584 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -37,8 +37,10 @@ public class TableRuntimeCleanupState {
return lastDanglingDeleteFilesCleanTime;
}
- public void setLastDanglingDeleteFilesCleanTime(long
lastDanglingDeleteFilesCleanTime) {
+ public TableRuntimeCleanupState setLastDanglingDeleteFilesCleanTime(
+ long lastDanglingDeleteFilesCleanTime) {
this.lastDanglingDeleteFilesCleanTime = lastDanglingDeleteFilesCleanTime;
+ return this;
}
public long getLastDataExpiringTime() {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 61b00b833..825410296 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -46,6 +46,8 @@ public class TestIcebergProcessFactory {
assertSupportedAction("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
Duration.ofHours(1));
assertSupportedAction(
"clean-orphan-files", IcebergActions.DELETE_ORPHANS,
Duration.ofHours(24));
+ assertSupportedAction(
+ "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE,
Duration.ofHours(24));
}
@Test
@@ -54,6 +56,11 @@ public class TestIcebergProcessFactory {
"expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
SnapshotsExpiringProcess.class, 0);
assertTriggerWhenDue(
"clean-orphan-files", IcebergActions.DELETE_ORPHANS,
OrphanFilesCleaningProcess.class, 0);
+ assertTriggerWhenDue(
+ "clean-dangling-delete-files",
+ IcebergActions.CLEAN_DANGLING_DELETE,
+ DanglingDeleteFilesCleaningProcess.class,
+ 0);
}
@Test
@@ -62,12 +69,18 @@ public class TestIcebergProcessFactory {
"expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
System.currentTimeMillis());
assertTriggerNotDue(
"clean-orphan-files", IcebergActions.DELETE_ORPHANS,
System.currentTimeMillis());
+ assertTriggerNotDue(
+ "clean-dangling-delete-files",
+ IcebergActions.CLEAN_DANGLING_DELETE,
+ System.currentTimeMillis());
}
@Test
public void testTriggerActionDisabled() {
assertTriggerDisabled("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
false, 0);
assertTriggerDisabled("clean-orphan-files", IcebergActions.DELETE_ORPHANS,
false, 0);
+ assertTriggerDisabled(
+ "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE,
false, 0);
}
private void assertSupportedAction(
@@ -153,6 +166,8 @@ public class TestIcebergProcessFactory {
tableConfiguration.setExpireSnapshotEnabled(enabled);
} else if ("clean-orphan-files".equals(configKey)) {
tableConfiguration.setCleanOrphanEnabled(enabled);
+ } else if ("clean-dangling-delete-files".equals(configKey)) {
+ tableConfiguration.setDeleteDanglingDeleteFilesEnabled(enabled);
}
TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState();
@@ -160,11 +175,14 @@ public class TestIcebergProcessFactory {
cleanupState.setLastSnapshotsExpiringTime(lastTime);
} else if ("clean-orphan-files".equals(configKey)) {
cleanupState.setLastOrphanFilesCleanTime(lastTime);
+ } else if ("clean-dangling-delete-files".equals(configKey)) {
+ cleanupState.setLastDanglingDeleteFilesCleanTime(lastTime);
}
TableRuntime runtime = mock(TableRuntime.class);
doReturn(tableConfiguration).when(runtime).getTableConfiguration();
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
return runtime;
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
index ba2a8767c..f1c13a666 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
@@ -31,7 +31,6 @@ class PeriodicTableSchedulerTestBase extends
PeriodicTableScheduler {
private final CleanupOperation cleanupOperation;
private final boolean enabled;
private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; //
1 hour
- private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60
* 60 * 1000L;
private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour
public PeriodicTableSchedulerTestBase(
@@ -72,8 +71,6 @@ class PeriodicTableSchedulerTestBase extends
PeriodicTableScheduler {
switch (cleanupOperation) {
case SNAPSHOTS_EXPIRING:
return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL;
- case DANGLING_DELETE_FILES_CLEANING:
- return currentTime - lastCleanupEndTime >=
DANGLING_DELETE_FILES_CLEANING_INTERVAL;
case DATA_EXPIRING:
return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL;
default:
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
index 5b50245b6..6c8e05142 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
@@ -25,46 +25,9 @@ import org.mockito.Mockito;
import java.time.Duration;
-/**
- * Tests for configurable interval in DanglingDeleteFilesCleaningExecutor and
- * SnapshotsExpiringExecutor.
- */
+/** Tests for configurable interval in SnapshotsExpiringExecutor and
ProcessDataExpiringExecutor. */
public class TestConfigurableIntervalExecutors {
- @Test
- public void testDanglingDeleteFilesDefaultInterval() {
- Duration interval = Duration.ofDays(1);
- DanglingDeleteFilesCleaningExecutor executor =
- new DanglingDeleteFilesCleaningExecutor(null, 1, interval);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(Duration.ofDays(1).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-
- @Test
- public void testDanglingDeleteFilesCustomInterval() {
- Duration interval = Duration.ofHours(12);
- DanglingDeleteFilesCleaningExecutor executor =
- new DanglingDeleteFilesCleaningExecutor(null, 1, interval);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(
- Duration.ofHours(12).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-
- @Test
- public void testDanglingDeleteFilesShouldExecuteAfterInterval() {
- Duration interval = Duration.ofHours(6);
- DanglingDeleteFilesCleaningExecutor executor =
- new DanglingDeleteFilesCleaningExecutor(null, 1, interval);
-
- long now = System.currentTimeMillis();
- // 7 hours ago - should execute
- Assert.assertTrue(executor.shouldExecute(now -
Duration.ofHours(7).toMillis()));
- // 5 hours ago - should not execute
- Assert.assertFalse(executor.shouldExecute(now -
Duration.ofHours(5).toMillis()));
- }
-
@Test
public void testSnapshotsExpiringDefaultInterval() {
Duration interval = Duration.ofHours(1);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
index fef2a3bd4..7bee81602 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -166,10 +166,7 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
@Test
public void testShouldExecuteTaskWithNoPreviousCleanup() {
List<CleanupOperation> operations =
- Arrays.asList(
- CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
- CleanupOperation.DATA_EXPIRING,
- CleanupOperation.SNAPSHOTS_EXPIRING);
+ Arrays.asList(CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
@@ -190,10 +187,7 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
@Test
public void testShouldNotExecuteTaskWithRecentCleanup() {
List<CleanupOperation> operations =
- Arrays.asList(
- CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
- CleanupOperation.DATA_EXPIRING,
- CleanupOperation.SNAPSHOTS_EXPIRING);
+ Arrays.asList(CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
@@ -219,10 +213,7 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
@Test
public void testShouldExecuteTaskWithOldCleanup() {
List<CleanupOperation> operations =
- Arrays.asList(
- CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
- CleanupOperation.DATA_EXPIRING,
- CleanupOperation.SNAPSHOTS_EXPIRING);
+ Arrays.asList(CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 1d5f81b3b..826006a31 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
public final class Action {
- private static final int MAX_NAME_LENGTH = 16;
+ private static final int MAX_NAME_LENGTH = 32;
private static final Map<String, Action> registeredActions = new
ConcurrentHashMap<>();
private final String name;
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index 7b8319260..da1791e93 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -29,4 +29,5 @@ public class IcebergActions {
public static final Action SYNC_HIVE = Action.register("sync-hive");
public static final Action EXPIRE_DATA = Action.register("expire-data");
public static final Action EXPIRE_SNAPSHOTS =
Action.register("expire-snapshots");
+ public static final Action CLEAN_DANGLING_DELETE =
Action.register("clean-dangling-delete-files");
}
diff --git
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
index ed1b26d53..31a9331ee 100644
---
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
+++
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -47,6 +47,7 @@ public class TestLocalExecutionEngine {
public void testSubmitUsesCustomPoolByTag() throws Exception {
assertCustomPoolByTag("snapshots-expiring");
assertCustomPoolByTag("orphan-files-cleaning");
+ assertCustomPoolByTag("clean-dangling-delete-files");
}
private void assertCustomPoolByTag(String tag) throws Exception {
@@ -154,6 +155,7 @@ public class TestLocalExecutionEngine {
properties.put("pool.default.thread-count", "1");
properties.put("pool.snapshots-expiring.thread-count", "1");
properties.put("pool.orphan-files-cleaning.thread-count", "1");
+ properties.put("pool.clean-dangling-delete-files.thread-count", "1");
properties.put("process.status.ttl", ttl);
localEngine.open(properties);
return localEngine;
diff --git a/charts/amoro/templates/amoro-configmap.yaml
b/charts/amoro/templates/amoro-configmap.yaml
index d00566f2c..b11d2bed3 100644
--- a/charts/amoro/templates/amoro-configmap.yaml
+++ b/charts/amoro/templates/amoro-configmap.yaml
@@ -103,10 +103,6 @@ data:
enabled: true
thread-count: 10
- clean-dangling-delete-files:
- enabled: true
- thread-count: 10
-
sync-hive-tables:
enabled: false
thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index e5fc2654a..e4e9399f2 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -112,10 +112,6 @@ ams:
enabled: true
thread-count: 10
- clean-dangling-delete-files:
- enabled: true
- thread-count: 10
-
sync-hive-tables:
enabled: false
thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 52e062629..5eefcd798 100755
--- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -23,3 +23,4 @@ execute-engines:
pool.default.thread-count: 10
pool.snapshots-expiring.thread-count: 10
pool.orphan-files-cleaning.thread-count: 10
+ pool.clean-dangling-delete-files.thread-count: 10
\ No newline at end of file
diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index 5825fde34..ff7719384 100755
--- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -24,3 +24,5 @@ process-factories:
expire-snapshots.interval: "1h"
clean-orphan-files.enabled: "true"
clean-orphan-files.interval: "1d"
+ clean-dangling-delete-files.enabled: "true"
+ clean-dangling-delete-files.interval: "1d"
diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md
index 060923b3c..216b1a3e4 100644
--- a/docs/admin-guides/deployment.md
+++ b/docs/admin-guides/deployment.md
@@ -274,6 +274,8 @@ process-factories:
expire-snapshots.interval: "1h" # interval for expiring snapshots
clean-orphan-files.enabled: "true" # enable orphan files cleaning
clean-orphan-files.interval: "1d" # interval for cleaning orphan
files
+ clean-dangling-delete-files.enabled: "true" # enable dangling delete
files cleaning
+ clean-dangling-delete-files.interval: "1d" # interval for cleaning
dangling delete files
```
{{< hint info >}}
@@ -304,6 +306,7 @@ execute-engines:
pool.default.thread-count: 10 # default thread pool
size
pool.snapshots-expiring.thread-count: 10 # thread pool for
snapshot expiration
pool.orphan-files-cleaning.thread-count: 10 # thread pool for orphan
file cleaning
+ pool.clean-dangling-delete-files.thread-count: 10 # thread pool for
dangling delete files cleaning
process.status.ttl: 4h # TTL for process status
cache
```
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index aac5c4c42..51428b605 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -49,9 +49,6 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| auto-create-tags.thread-count | 3 | The number of threads used for creating
tags. |
| blocker.timeout | 1 min | Session timeout. Default unit is milliseconds if
not specified. |
| catalog-meta-cache.expiration-interval | 1 min | TTL for catalog metadata. |
-| clean-dangling-delete-files.enabled | true | Enable dangling delete files
cleaning. |
-| clean-dangling-delete-files.interval | 1 d | Interval for cleaning dangling
delete files. |
-| clean-dangling-delete-files.thread-count | 10 | The number of threads used
for dangling delete files cleaning. |
| data-expiration.enabled | true | Enable data expiration |
| data-expiration.interval | 1 d | Execute interval for data expiration |
| data-expiration.thread-count | 10 | The number of threads used for data
expiring |