This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 75dd6570c [AMORO-4208] Refactor orphan-files-cleaning via
ProcessFactory plugin (#4209)
75dd6570c is described below
commit 75dd6570c0d8125bf892a2747342a16df089351d
Author: WenLingzhang <[email protected]>
AuthorDate: Thu May 14 15:07:50 2026 +0800
[AMORO-4208] Refactor orphan-files-cleaning via ProcessFactory plugin
(#4209)
* Refactor orphan-files-cleaning via ProcessFactory plugin
# Conflicts:
#
amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
* Remove unused orphan file cleaning configs and inline executor
* Add process factory and execute engine configuration docs
---------
Co-authored-by: 张文领 <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 18 ---
.../amoro/server/AmoroManagementConfValidator.java | 3 -
.../apache/amoro/server/AmoroServiceContainer.java | 1 -
.../process/iceberg/IcebergProcessFactory.java | 32 ++++++
.../iceberg/OrphanFilesCleaningProcess.java | 79 +++++++++++++
.../scheduler/inline/InlineTableExecutors.java | 12 --
.../inline/OrphanFilesCleaningExecutor.java | 85 --------------
.../amoro/server/table/DefaultTableRuntime.java | 5 -
.../server/table/cleanup/CleanupOperation.java | 1 -
.../table/cleanup/TableRuntimeCleanupState.java | 3 +-
.../org/apache/amoro/server/AmsEnvironment.java | 1 -
.../server/TestAmoroManagementConfValidator.java | 8 --
.../process/iceberg/TestIcebergProcessFactory.java | 128 ++++++++++++---------
.../inline/PeriodicTableSchedulerTestBase.java | 3 -
.../inline/TestPeriodicTableSchedulerCleanup.java | 3 -
.../amoro/process/TestLocalExecutionEngine.java | 10 +-
.../iceberg/maintainer/IcebergTableMaintainer.java | 2 +-
charts/amoro/templates/amoro-configmap.yaml | 4 -
dist/src/main/amoro-bin/conf/config.yaml | 5 -
.../amoro-bin/conf/plugins/execute-engines.yaml | 1 +
.../amoro-bin/conf/plugins/process-factories.yaml | 2 +
docs/admin-guides/deployment.md | 64 +++++++++++
docs/configuration/ams-config.md | 3 -
23 files changed, 265 insertions(+), 208 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 9ac6fb08e..49eade550 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -174,24 +174,6 @@ public class AmoroManagementConf {
.defaultValue(Duration.ofHours(1))
.withDescription("Interval for expiring snapshots.");
- public static final ConfigOption<Boolean> CLEAN_ORPHAN_FILES_ENABLED =
- ConfigOptions.key("clean-orphan-files.enabled")
- .booleanType()
- .defaultValue(true)
- .withDescription("Enable orphan files cleaning.");
-
- public static final ConfigOption<Integer> CLEAN_ORPHAN_FILES_THREAD_COUNT =
- ConfigOptions.key("clean-orphan-files.thread-count")
- .intType()
- .defaultValue(10)
- .withDescription("The number of threads used for orphan files
cleaning.");
-
- public static final ConfigOption<Duration> CLEAN_ORPHAN_FILES_INTERVAL =
- ConfigOptions.key("clean-orphan-files.interval")
- .durationType()
- .defaultValue(Duration.ofDays(1))
- .withDescription("Interval for cleaning orphan files.");
-
public static final ConfigOption<Boolean>
CLEAN_DANGLING_DELETE_FILES_ENABLED =
ConfigOptions.key("clean-dangling-delete-files.enabled")
.booleanType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
index 1e78520f0..627b577d6 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
@@ -86,9 +86,6 @@ public class AmoroManagementConfValidator {
validateThreadCount(configurations,
AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT);
}
- if
(configurations.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) {
- validateThreadCount(configurations,
AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT);
- }
if
(configurations.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
validateThreadCount(configurations,
AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 0be364523..d174e559e 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -290,7 +290,6 @@ public class AmoroServiceContainer {
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(processService.getTableHandlerChain());
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
-
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 18de241db..1f7813476 100755
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -57,6 +57,14 @@ public class IcebergProcessFactory implements ProcessFactory
{
.durationType()
.defaultValue(Duration.ofHours(1));
+ public static final ConfigOption<Boolean> ORPHAN_FILES_CLEANING_ENABLED =
+
ConfigOptions.key("clean-orphan-files.enabled").booleanType().defaultValue(true);
+
+ public static final ConfigOption<Duration> ORPHAN_FILES_CLEANING_INTERVAL =
+ ConfigOptions.key("clean-orphan-files.interval")
+ .durationType()
+ .defaultValue(Duration.ofDays(1));
+
private ExecuteEngine localEngine;
private final Map<Action, ProcessTriggerStrategy> actions =
Maps.newHashMap();
private final List<TableFormat> formats =
@@ -91,7 +99,10 @@ public class IcebergProcessFactory implements ProcessFactory
{
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
return triggerExpireSnapshot(tableRuntime);
+ } else if (IcebergActions.DELETE_ORPHANS.equals(action)) {
+ return triggerCleanOrphans(tableRuntime);
}
+
return Optional.empty();
}
@@ -113,6 +124,12 @@ public class IcebergProcessFactory implements
ProcessFactory {
this.actions.put(
IcebergActions.EXPIRE_SNAPSHOTS,
ProcessTriggerStrategy.triggerAtFixRate(interval));
}
+
+ if (configs.getBoolean(ORPHAN_FILES_CLEANING_ENABLED)) {
+ Duration interval = configs.getDuration(ORPHAN_FILES_CLEANING_INTERVAL);
+ this.actions.put(
+ IcebergActions.DELETE_ORPHANS,
ProcessTriggerStrategy.triggerAtFixRate(interval));
+ }
}
private Optional<TableProcess> triggerExpireSnapshot(TableRuntime
tableRuntime) {
@@ -130,6 +147,21 @@ public class IcebergProcessFactory implements
ProcessFactory {
return Optional.of(new SnapshotsExpiringProcess(tableRuntime,
localEngine));
}
+ private Optional<TableProcess> triggerCleanOrphans(TableRuntime
tableRuntime) {
+ if (localEngine == null ||
!tableRuntime.getTableConfiguration().isCleanOrphanEnabled()) {
+ return Optional.empty();
+ }
+
+ long lastExecuteTime =
+
tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastOrphanFilesCleanTime();
+ ProcessTriggerStrategy strategy =
actions.get(IcebergActions.DELETE_ORPHANS);
+ if (System.currentTimeMillis() - lastExecuteTime <
strategy.getTriggerInterval().toMillis()) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new OrphanFilesCleaningProcess(tableRuntime,
localEngine));
+ }
+
@Override
public void close() {}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
new file mode 100644
index 000000000..22e4c05ee
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/OrphanFilesCleaningProcess.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalProcess;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainerFactory;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Local table process for cleaning Iceberg orphan files. */
+public class OrphanFilesCleaningProcess extends TableProcess implements
LocalProcess {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OrphanFilesCleaningProcess.class);
+
+ public OrphanFilesCleaningProcess(TableRuntime tableRuntime, ExecuteEngine
engine) {
+ super(tableRuntime, engine);
+ }
+
+ @Override
+ public String tag() {
+ return getAction().getName().toLowerCase();
+ }
+
+ @Override
+ public void run() {
+ try {
+ AmoroTable<?> amoroTable = tableRuntime.loadTable();
+ TableMaintainer tableMaintainer =
TableMaintainerFactory.create(amoroTable, tableRuntime);
+ tableMaintainer.cleanOrphanFiles();
+ tableRuntime.updateState(
+ DefaultTableRuntime.CLEANUP_STATE_KEY,
+ cleanUp ->
cleanUp.setLastOrphanFilesCleanTime(System.currentTimeMillis()));
+ } catch (Throwable t) {
+ LOG.error("Failed to clean orphan files for table {}",
tableRuntime.getTableIdentifier(), t);
+ }
+ }
+
+ @Override
+ public Action getAction() {
+ return IcebergActions.DELETE_ORPHANS;
+ }
+
+ @Override
+ public Map<String, String> getProcessParameters() {
+ return Maps.newHashMap();
+ }
+
+ @Override
+ public Map<String, String> getSummary() {
+ return Maps.newHashMap();
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 173c0259e..4e5b4beb3 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -28,7 +28,6 @@ public class InlineTableExecutors {
private static final InlineTableExecutors instance = new
InlineTableExecutors();
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
- private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor;
private DanglingDeleteFilesCleaningExecutor
danglingDeleteFilesCleaningExecutor;
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
@@ -42,13 +41,6 @@ public class InlineTableExecutors {
}
public void setup(TableService tableService, Configurations conf) {
- if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) {
- this.orphanFilesCleaningExecutor =
- new OrphanFilesCleaningExecutor(
- tableService,
-
conf.getInteger(AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT),
- conf.get(AmoroManagementConf.CLEAN_ORPHAN_FILES_INTERVAL));
- }
if
(conf.getBoolean(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) {
this.danglingDeleteFilesCleaningExecutor =
new DanglingDeleteFilesCleaningExecutor(
@@ -108,10 +100,6 @@ public class InlineTableExecutors {
return tableRefreshingExecutor;
}
- public OrphanFilesCleaningExecutor getOrphanFilesCleaningExecutor() {
- return orphanFilesCleaningExecutor;
- }
-
public DanglingDeleteFilesCleaningExecutor
getDanglingDeleteFilesCleaningExecutor() {
return danglingDeleteFilesCleaningExecutor;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
deleted file mode 100644
index 19db0f192..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.AmoroTable;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.maintainer.TableMaintainer;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class OrphanFilesCleaningExecutor extends PeriodicTableScheduler {
- private static final Logger LOG =
LoggerFactory.getLogger(OrphanFilesCleaningExecutor.class);
- private final Duration interval;
-
- public OrphanFilesCleaningExecutor(TableService tableService, int poolSize,
Duration interval) {
- super(tableService, poolSize);
- this.interval = interval;
- }
-
- @Override
- protected long getNextExecutingTime(TableRuntime tableRuntime) {
- return interval.toMillis();
- }
-
- @Override
- protected boolean shouldExecute(Long lastCleanupEndTime) {
- return System.currentTimeMillis() - lastCleanupEndTime >=
interval.toMillis();
- }
-
- @Override
- protected CleanupOperation getCleanupOperation() {
- return CleanupOperation.ORPHAN_FILES_CLEANING;
- }
-
- @Override
- protected boolean enabled(TableRuntime tableRuntime) {
- return tableRuntime.getTableConfiguration().isCleanOrphanEnabled();
- }
-
- @Override
- public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
- scheduleIfNecessary(tableRuntime, getStartDelay());
- }
-
- @Override
- protected long getExecutorDelay() {
- return ThreadLocalRandom.current().nextLong(interval.toMillis());
- }
-
- @Override
- public void execute(TableRuntime tableRuntime) {
- try {
- LOG.info("{} start cleaning orphan files",
tableRuntime.getTableIdentifier());
- AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
- tableMaintainer.cleanOrphanFiles();
- } catch (Throwable t) {
- LOG.error("{} failed to clean orphan file",
tableRuntime.getTableIdentifier(), t);
- }
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 9b4b56a95..f4a4ef73e 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -342,8 +342,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
public long getLastCleanTime(CleanupOperation operation) {
TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
switch (operation) {
- case ORPHAN_FILES_CLEANING:
- return state.getLastOrphanFilesCleanTime();
case DANGLING_DELETE_FILES_CLEANING:
return state.getLastDanglingDeleteFilesCleanTime();
case DATA_EXPIRING:
@@ -362,9 +360,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
CLEANUP_STATE_KEY,
state -> {
switch (operation) {
- case ORPHAN_FILES_CLEANING:
- state.setLastOrphanFilesCleanTime(time);
- break;
case DANGLING_DELETE_FILES_CLEANING:
state.setLastDanglingDeleteFilesCleanTime(time);
break;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
index 10afefe63..b6597db82 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
@@ -21,7 +21,6 @@ package org.apache.amoro.server.table.cleanup;
/** Table cleanup operation enum. Defines different operation types for table
cleanup tasks. */
public enum CleanupOperation {
DANGLING_DELETE_FILES_CLEANING,
- ORPHAN_FILES_CLEANING,
DATA_EXPIRING,
SNAPSHOTS_EXPIRING,
// NONE indicates operation types where no cleanup process records are
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
index f65ae387c..639506ea9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -28,8 +28,9 @@ public class TableRuntimeCleanupState {
return lastOrphanFilesCleanTime;
}
- public void setLastOrphanFilesCleanTime(long lastOrphanFilesCleanTime) {
+ public TableRuntimeCleanupState setLastOrphanFilesCleanTime(long
lastOrphanFilesCleanTime) {
this.lastOrphanFilesCleanTime = lastOrphanFilesCleanTime;
+ return this;
}
public long getLastDanglingDeleteFilesCleanTime() {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index 93fd182d3..916765d26 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -369,7 +369,6 @@ public class AmsEnvironment {
+ " refresh-table-thread-count: 10\n"
+ " refresh-table-interval: 60000 #1min\n"
+ " expire-table-thread-count: 10\n"
- + " clean-orphan-file-thread-count: 10\n"
+ " sync-hive-tables-thread-count: 10\n"
+ "\n"
+ " thrift-server:\n"
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
index 8e52f779b..532bc1888 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConfValidator.java
@@ -135,14 +135,6 @@ public class TestAmoroManagementConfValidator {
configurations.setInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT,
10);
AmoroManagementConfValidator.validateConfig(configurations);
- configurations.setBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED,
true);
-
configurations.setInteger(AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT,
-1);
- Assert.assertThrows(
- IllegalArgumentException.class,
- () -> AmoroManagementConfValidator.validateConfig(configurations));
-
configurations.setInteger(AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT,
10);
- AmoroManagementConfValidator.validateConfig(configurations);
-
configurations.setBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED,
true);
configurations.setInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT,
-1);
Assert.assertThrows(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 35151ce3d..61b00b833 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -43,104 +43,128 @@ public class TestIcebergProcessFactory {
@Test
public void testOpenAndSupportedActions() {
+ assertSupportedAction("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
Duration.ofHours(1));
+ assertSupportedAction(
+ "clean-orphan-files", IcebergActions.DELETE_ORPHANS,
Duration.ofHours(24));
+ }
+
+ @Test
+ public void testTriggerActionWhenDue() {
+ assertTriggerWhenDue(
+ "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
SnapshotsExpiringProcess.class, 0);
+ assertTriggerWhenDue(
+ "clean-orphan-files", IcebergActions.DELETE_ORPHANS,
OrphanFilesCleaningProcess.class, 0);
+ }
+
+ @Test
+ public void testTriggerActionNotDue() {
+ assertTriggerNotDue(
+ "expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
System.currentTimeMillis());
+ assertTriggerNotDue(
+ "clean-orphan-files", IcebergActions.DELETE_ORPHANS,
System.currentTimeMillis());
+ }
+
+ @Test
+ public void testTriggerActionDisabled() {
+ assertTriggerDisabled("expire-snapshots", IcebergActions.EXPIRE_SNAPSHOTS,
false, 0);
+ assertTriggerDisabled("clean-orphan-files", IcebergActions.DELETE_ORPHANS,
false, 0);
+ }
+
+ private void assertSupportedAction(
+ String configKey, org.apache.amoro.Action action, Duration interval) {
IcebergProcessFactory factory = new IcebergProcessFactory();
Map<String, String> properties = new HashMap<>();
- properties.put("expire-snapshots.enabled", "true");
- properties.put("expire-snapshots.interval", "1h");
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", interval.toHours() + "h");
factory.open(properties);
Map<TableFormat, Set<org.apache.amoro.Action>> supported =
factory.supportedActions();
-
Assert.assertTrue(supported.get(TableFormat.ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS));
- Assert.assertTrue(
-
supported.get(TableFormat.MIXED_ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS));
- Assert.assertTrue(
-
supported.get(TableFormat.MIXED_HIVE).contains(IcebergActions.EXPIRE_SNAPSHOTS));
-
- ProcessTriggerStrategy strategy =
- factory.triggerStrategy(TableFormat.ICEBERG,
IcebergActions.EXPIRE_SNAPSHOTS);
- Assert.assertEquals(Duration.ofHours(1), strategy.getTriggerInterval());
+ Assert.assertTrue(supported.get(TableFormat.ICEBERG).contains(action));
+
Assert.assertTrue(supported.get(TableFormat.MIXED_ICEBERG).contains(action));
+ Assert.assertTrue(supported.get(TableFormat.MIXED_HIVE).contains(action));
+
+ ProcessTriggerStrategy strategy =
factory.triggerStrategy(TableFormat.ICEBERG, action);
+ Assert.assertEquals(interval, strategy.getTriggerInterval());
}
- @Test
- public void testTriggerExpireSnapshotWhenDue() {
+ private void assertTriggerWhenDue(
+ String configKey, org.apache.amoro.Action action, Class<?> processClass,
long lastTime) {
IcebergProcessFactory factory = new IcebergProcessFactory();
Map<String, String> properties = new HashMap<>();
- properties.put("expire-snapshots.enabled", "true");
- properties.put("expire-snapshots.interval", "1h");
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", "1h");
factory.open(properties);
LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
factory.availableExecuteEngines(Arrays.asList(localEngine));
- TableConfiguration tableConfiguration = new
TableConfiguration().setExpireSnapshotEnabled(true);
- TableRuntimeCleanupState cleanupState =
- new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0);
+ TableRuntime runtime = createRuntime(configKey, true, lastTime);
- TableRuntime runtime = mock(TableRuntime.class);
- doReturn(tableConfiguration).when(runtime).getTableConfiguration();
-
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
-
- Optional<org.apache.amoro.process.TableProcess> process =
- factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+ Optional<org.apache.amoro.process.TableProcess> process =
factory.trigger(runtime, action);
Assert.assertTrue(process.isPresent());
- Assert.assertTrue(process.get() instanceof SnapshotsExpiringProcess);
+ Assert.assertTrue(processClass.isInstance(process.get()));
Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME,
process.get().getExecutionEngine());
}
- @Test
- public void testTriggerExpireSnapshotNotDue() {
+ private void assertTriggerNotDue(
+ String configKey, org.apache.amoro.Action action, long lastTime) {
IcebergProcessFactory factory = new IcebergProcessFactory();
Map<String, String> properties = new HashMap<>();
- properties.put("expire-snapshots.enabled", "true");
- properties.put("expire-snapshots.interval", "1h");
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", "1h");
factory.open(properties);
factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class)));
- TableConfiguration tableConfiguration = new
TableConfiguration().setExpireSnapshotEnabled(true);
- long now = System.currentTimeMillis();
- TableRuntimeCleanupState cleanupState =
- new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(now);
-
- TableRuntime runtime = mock(TableRuntime.class);
- doReturn(tableConfiguration).when(runtime).getTableConfiguration();
-
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+ TableRuntime runtime = createRuntime(configKey, true, lastTime);
- Optional<org.apache.amoro.process.TableProcess> process =
- factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+ Optional<org.apache.amoro.process.TableProcess> process =
factory.trigger(runtime, action);
Assert.assertFalse(process.isPresent());
}
- @Test
- public void testTriggerExpireSnapshotDisabled() {
+ private void assertTriggerDisabled(
+ String configKey, org.apache.amoro.Action action, boolean enabled, long
lastTime) {
IcebergProcessFactory factory = new IcebergProcessFactory();
Map<String, String> properties = new HashMap<>();
- properties.put("expire-snapshots.enabled", "true");
- properties.put("expire-snapshots.interval", "1h");
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", "1h");
factory.open(properties);
factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class)));
- TableConfiguration tableConfiguration =
- new TableConfiguration().setExpireSnapshotEnabled(false);
- TableRuntimeCleanupState cleanupState =
- new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0);
+ TableRuntime runtime = createRuntime(configKey, enabled, lastTime);
+
+ Optional<org.apache.amoro.process.TableProcess> process =
factory.trigger(runtime, action);
+
+ Assert.assertFalse(process.isPresent());
+ }
+
+ private TableRuntime createRuntime(String configKey, boolean enabled, long
lastTime) {
+ TableConfiguration tableConfiguration = new TableConfiguration();
+ if ("expire-snapshots".equals(configKey)) {
+ tableConfiguration.setExpireSnapshotEnabled(enabled);
+ } else if ("clean-orphan-files".equals(configKey)) {
+ tableConfiguration.setCleanOrphanEnabled(enabled);
+ }
+
+ TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState();
+ if ("expire-snapshots".equals(configKey)) {
+ cleanupState.setLastSnapshotsExpiringTime(lastTime);
+ } else if ("clean-orphan-files".equals(configKey)) {
+ cleanupState.setLastOrphanFilesCleanTime(lastTime);
+ }
TableRuntime runtime = mock(TableRuntime.class);
doReturn(tableConfiguration).when(runtime).getTableConfiguration();
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
-
- Optional<org.apache.amoro.process.TableProcess> process =
- factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
-
- Assert.assertFalse(process.isPresent());
+ return runtime;
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
index 50c0e9d19..ba2a8767c 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
@@ -31,7 +31,6 @@ class PeriodicTableSchedulerTestBase extends
PeriodicTableScheduler {
private final CleanupOperation cleanupOperation;
private final boolean enabled;
private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; //
1 hour
- private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 *
1000L; // 1 day
private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60
* 60 * 1000L;
private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour
@@ -73,8 +72,6 @@ class PeriodicTableSchedulerTestBase extends
PeriodicTableScheduler {
switch (cleanupOperation) {
case SNAPSHOTS_EXPIRING:
return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL;
- case ORPHAN_FILES_CLEANING:
- return currentTime - lastCleanupEndTime >=
ORPHAN_FILES_CLEANING_INTERVAL;
case DANGLING_DELETE_FILES_CLEANING:
return currentTime - lastCleanupEndTime >=
DANGLING_DELETE_FILES_CLEANING_INTERVAL;
case DATA_EXPIRING:
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
index de25eac21..fef2a3bd4 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -167,7 +167,6 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
public void testShouldExecuteTaskWithNoPreviousCleanup() {
List<CleanupOperation> operations =
Arrays.asList(
- CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
@@ -192,7 +191,6 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
public void testShouldNotExecuteTaskWithRecentCleanup() {
List<CleanupOperation> operations =
Arrays.asList(
- CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
@@ -222,7 +220,6 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
public void testShouldExecuteTaskWithOldCleanup() {
List<CleanupOperation> operations =
Arrays.asList(
- CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
diff --git
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
index 6c67a2097..ed1b26d53 100644
---
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
+++
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -45,6 +45,11 @@ public class TestLocalExecutionEngine {
@Test
public void testSubmitUsesCustomPoolByTag() throws Exception {
+ assertCustomPoolByTag("snapshots-expiring");
+ assertCustomPoolByTag("orphan-files-cleaning");
+ }
+
+ private void assertCustomPoolByTag(String tag) throws Exception {
engine = createEngineWithTtl("1h");
CountDownLatch started = new CountDownLatch(1);
@@ -54,7 +59,7 @@ public class TestLocalExecutionEngine {
new LocalProcessTableProcess(
mock(TableRuntime.class),
engine,
- "snapshots-expiring",
+ tag,
() -> {
threadName.set(Thread.currentThread().getName());
started.countDown();
@@ -64,7 +69,7 @@ public class TestLocalExecutionEngine {
Assertions.assertTrue(started.await(5, TimeUnit.SECONDS), "process should
start");
Assertions.assertTrue(
- threadName.get() != null &&
threadName.get().startsWith("local-snapshots-expiring-"),
+ threadName.get() != null && threadName.get().startsWith("local-" + tag
+ "-"),
"should run in custom pool");
waitForStatus(identifier, ProcessStatus.SUCCESS, 5000);
@@ -148,6 +153,7 @@ public class TestLocalExecutionEngine {
Map<String, String> properties = new HashMap<>();
properties.put("pool.default.thread-count", "1");
properties.put("pool.snapshots-expiring.thread-count", "1");
+ properties.put("pool.orphan-files-cleaning.thread-count", "1");
properties.put("process.status.ttl", ttl);
localEngine.open(properties);
return localEngine;
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
index eb89c07c0..f906d4b86 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
@@ -1046,7 +1046,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
.latestExpiredSeq;
// only expire delete files with sequence-number less or equal to
expired data file
// there may be some dangling delete files, they will be cleaned by
- // OrphanFileCleaningExecutor
+ // OrphanFilesCleaningProcess
return fileEntry.getFile().dataSequenceNumber() <= seqUpperBound;
} else {
return true;
diff --git a/charts/amoro/templates/amoro-configmap.yaml
b/charts/amoro/templates/amoro-configmap.yaml
index e0638059d..d00566f2c 100644
--- a/charts/amoro/templates/amoro-configmap.yaml
+++ b/charts/amoro/templates/amoro-configmap.yaml
@@ -103,10 +103,6 @@ data:
enabled: true
thread-count: 10
- clean-orphan-files:
- enabled: true
- thread-count: 10
-
clean-dangling-delete-files:
enabled: true
thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index 65446fe2b..e5fc2654a 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -112,11 +112,6 @@ ams:
enabled: true
thread-count: 10
- clean-orphan-files:
- enabled: true
- thread-count: 10
- interval: 1d
-
clean-dangling-delete-files:
enabled: true
thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 5c3199f32..52e062629 100755
--- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -22,3 +22,4 @@ execute-engines:
properties:
pool.default.thread-count: 10
pool.snapshots-expiring.thread-count: 10
+ pool.orphan-files-cleaning.thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index e1455c2cf..5825fde34 100755
--- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -22,3 +22,5 @@ process-factories:
properties:
expire-snapshots.enabled: "true"
expire-snapshots.interval: "1h"
+ clean-orphan-files.enabled: "true"
+ clean-orphan-files.interval: "1d"
diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md
index 57f131533..060923b3c 100644
--- a/docs/admin-guides/deployment.md
+++ b/docs/admin-guides/deployment.md
@@ -247,6 +247,70 @@ scrape_configs:
- targets: ['localhost:9090'] # The host and port that you configured
in Amoro plugins configs file.
```
+### Configure process factories
+
+Process factories manage table maintenance actions like snapshot expiration
and orphan file cleaning.
+They are configured in `$AMORO_CONF_DIR/plugins/process-factories.yaml`.
+
+The configuration format is:
+
+```yaml
+process-factories:
+ - name: # the unified plugin name.
+ enabled: # if this plugin is enabled, default is true.
+ priority: # plugin priority for loading order.
+ properties: # a map defines properties of plugin.
+```
+
+Currently, the `iceberg` process factory is available for Iceberg,
Mixed-Iceberg, and Mixed-Hive formats:
+
+```yaml
+process-factories:
+ - name: iceberg
+ enabled: true
+ priority: 100
+ properties:
+ expire-snapshots.enabled: "true" # enable snapshots expiring
+ expire-snapshots.interval: "1h" # interval for expiring snapshots
+ clean-orphan-files.enabled: "true" # enable orphan files cleaning
+ clean-orphan-files.interval: "1d" # interval for cleaning orphan
files
+```
+
+{{< hint info >}}
+Process-level properties control whether an action is registered. Table-level
properties (see [Table Configurations](../../user-guides/configurations/))
control whether a specific table executes the action.
+{{< /hint >}}
+
+### Configure execute engines
+
+Execute engines define how maintenance processes are executed. The `local`
engine runs processes in AMS thread pools.
+They are configured in `$AMORO_CONF_DIR/plugins/execute-engines.yaml`.
+
+The configuration format is:
+
+```yaml
+execute-engines:
+ - name: # the unified plugin name.
+ enabled: # if this engine is enabled, default is true.
+ priority: # engine priority for loading order.
+ properties: # a map defines properties of engine.
+```
+
+```yaml
+execute-engines:
+ - name: local
+ enabled: true
+ priority: 100
+ properties:
+ pool.default.thread-count: 10 # default thread pool
size
+ pool.snapshots-expiring.thread-count: 10 # thread pool for
snapshot expiration
+ pool.orphan-files-cleaning.thread-count: 10 # thread pool for orphan
file cleaning
+ process.status.ttl: 4h # TTL for process status
cache
+```
+
+{{< hint info >}}
+Custom pools use the pattern `pool.<name>.thread-count`. Processes select
pools via their `tag()` method, falling back to the default pool if no match.
+{{< /hint >}}
+
### Configure encrypted configuration items
For enhanced security, AMS supports encrypted values for sensitive
configuration items such as passwords within `config.yaml`. This prevents
plaintext passwords and other critical information from being directly exposed
in the configuration file.
Currently, AMS provides built-in support for base64 decryption, and users can
also implement custom decryption algorithms if needed (see [Using Customized
Encryption Method for Configurations](../using-customized-encryption-method/)).
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index c00841dae..aac5c4c42 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -52,9 +52,6 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| clean-dangling-delete-files.enabled | true | Enable dangling delete files
cleaning. |
| clean-dangling-delete-files.interval | 1 d | Interval for cleaning dangling
delete files. |
| clean-dangling-delete-files.thread-count | 10 | The number of threads used
for dangling delete files cleaning. |
-| clean-orphan-files.enabled | true | Enable orphan files cleaning. |
-| clean-orphan-files.interval | 1 d | Interval for cleaning orphan files. |
-| clean-orphan-files.thread-count | 10 | The number of threads used for orphan
files cleaning. |
| data-expiration.enabled | true | Enable data expiration |
| data-expiration.interval | 1 d | Execute interval for data expiration |
| data-expiration.thread-count | 10 | The number of threads used for data
expiring |