This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 38ef38134 [AMORO-4215] Refactor data expiring via ProcessFactory
plugin (#4218)
38ef38134 is described below
commit 38ef38134d54a7381a351b4f17cba42a6c5356ac
Author: WenLingzhang <[email protected]>
AuthorDate: Tue May 19 15:36:26 2026 +0800
[AMORO-4215] Refactor data expiring via ProcessFactory plugin (#4218)
* Refactor DataExpiringExecutor via ProcessFactory plugin
# Conflicts:
#
amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
#
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
# Conflicts:
#
amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
#
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
#
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
#
amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
#
amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
#
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
#
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
#
amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
# dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
# dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
# docs/admin-guides/deployment.md
# docs/configuration/ams-config.md
* Add recover logic
* Fix logging message formatting in DataExpiringProcess
* Fix data expiration property name in documentation
Corrected the property name from 'expire-data' to 'data-expiration' in the
documentation.
* Update using-tables.md
---------
Co-authored-by: 张文领 <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 18 -----
.../apache/amoro/server/AmoroServiceContainer.java | 1 -
.../process/iceberg/DataExpiringProcess.java | 80 ++++++++++++++++++++
.../process/iceberg/IcebergProcessFactory.java | 41 ++++++++++-
.../scheduler/inline/DataExpiringExecutor.java | 86 ----------------------
.../scheduler/inline/InlineTableExecutors.java | 12 ---
.../amoro/server/table/DefaultTableRuntime.java | 5 --
.../server/table/cleanup/CleanupOperation.java | 1 -
.../table/cleanup/TableRuntimeCleanupState.java | 3 +-
.../process/iceberg/TestIcebergProcessFactory.java | 22 ++++++
.../inline/PeriodicTableSchedulerTestBase.java | 3 -
.../inline/TestPeriodicTableSchedulerCleanup.java | 9 +--
.../amoro/process/TestLocalExecutionEngine.java | 2 +
charts/amoro/templates/amoro-configmap.yaml | 5 --
dist/src/main/amoro-bin/conf/config.yaml | 5 --
.../amoro-bin/conf/plugins/execute-engines.yaml | 3 +-
.../amoro-bin/conf/plugins/process-factories.yaml | 2 +
docs/admin-guides/deployment.md | 3 +
docs/configuration/ams-config.md | 3 -
docs/user-guides/using-tables.md | 2 +-
20 files changed, 155 insertions(+), 151 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 759b76700..fe72b87b6 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -663,24 +663,6 @@ public class AmoroManagementConf {
.withDescription(
"Comma-separated list of sensitive conf keys used to desensitize
related value.");
- /** configs of data expiration */
- public static final ConfigOption<Boolean> DATA_EXPIRATION_ENABLED =
- ConfigOptions.key("data-expiration.enabled")
- .booleanType()
- .defaultValue(true)
- .withDescription("Enable data expiration");
-
- public static final ConfigOption<Integer> DATA_EXPIRATION_THREAD_COUNT =
- ConfigOptions.key("data-expiration.thread-count")
- .intType()
- .defaultValue(10)
- .withDescription("The number of threads used for data expiring");
- public static final ConfigOption<Duration> DATA_EXPIRATION_INTERVAL =
- ConfigOptions.key("data-expiration.interval")
- .durationType()
- .defaultValue(Duration.ofDays(1))
- .withDescription("Execute interval for data expiration");
-
public static final String SYSTEM_CONFIG = "ams";
public static final String CATALOG_CORE_SITE = "core-site";
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 27615ab31..7956fd0bc 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -289,7 +289,6 @@ public class AmoroServiceContainer {
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(processService.getTableHandlerChain());
-
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DataExpiringProcess.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DataExpiringProcess.java
new file mode 100644
index 000000000..d3f9c0cc1
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DataExpiringProcess.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalProcess;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainerFactory;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Local table process for expiring Iceberg data. */
+public class DataExpiringProcess extends TableProcess implements LocalProcess {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataExpiringProcess.class);
+
+ public DataExpiringProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
+ super(tableRuntime, engine);
+ }
+
+ @Override
+ public String tag() {
+ return getAction().getName().toLowerCase();
+ }
+
+ @Override
+ public void run() {
+ try {
+ AmoroTable<?> amoroTable = tableRuntime.loadTable();
+ TableMaintainer tableMaintainer =
TableMaintainerFactory.create(amoroTable, tableRuntime);
+ tableMaintainer.expireData();
+ tableRuntime.updateState(
+ DefaultTableRuntime.CLEANUP_STATE_KEY,
+ cleanUp ->
cleanUp.setLastDataExpiringTime(System.currentTimeMillis()));
+ } catch (Throwable t) {
+ LOG.error("unexpected expire data error of table {}",
tableRuntime.getTableIdentifier(), t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @Override
+ public Action getAction() {
+ return IcebergActions.EXPIRE_DATA;
+ }
+
+ @Override
+ public Map<String, String> getProcessParameters() {
+ return Maps.newHashMap();
+ }
+
+ @Override
+ public Map<String, String> getSummary() {
+ return Maps.newHashMap();
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index db3ed8c74..be94f9a55 100755
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -73,6 +73,12 @@ public class IcebergProcessFactory implements ProcessFactory
{
.durationType()
.defaultValue(Duration.ofDays(1));
+ public static final ConfigOption<Boolean> DATA_EXPIRE_ENABLED =
+
ConfigOptions.key("expire-data.enabled").booleanType().defaultValue(true);
+
+ public static final ConfigOption<Duration> DATA_EXPIRE_INTERVAL =
+
ConfigOptions.key("expire-data.interval").durationType().defaultValue(Duration.ofDays(1));
+
private ExecuteEngine localEngine;
private final Map<Action, ProcessTriggerStrategy> actions =
Maps.newHashMap();
private final List<TableFormat> formats =
@@ -111,6 +117,8 @@ public class IcebergProcessFactory implements
ProcessFactory {
return triggerCleanOrphans(tableRuntime);
} else if (IcebergActions.CLEAN_DANGLING_DELETE.equals(action)) {
return triggerCleanDanglingDelete(tableRuntime);
+ } else if (IcebergActions.EXPIRE_DATA.equals(action)) {
+ return triggerDataExpiring(tableRuntime);
}
return Optional.empty();
@@ -127,13 +135,18 @@ public class IcebergProcessFactory implements
ProcessFactory {
+ action);
}
- // SnapshotsExpiringProcess and OrphanFilesCleaningProcess are stateless,
idempotent
- // one-shot local maintenance tasks (no checkpoint), so recovery simply
rebuilds the
- // process so it can run again. The store/processId/tracking is owned by
ProcessService.
+ // SnapshotsExpiringProcess, OrphanFilesCleaningProcess,
DanglingDeleteFilesCleaningProcess
+ // and DataExpiringProcess are stateless, idempotent one-shot local
maintenance tasks
+ // (no checkpoint), so recovery simply rebuilds the process so it can run
again.
+ // The store/processId/tracking is owned by ProcessService.
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
return new SnapshotsExpiringProcess(tableRuntime, localEngine);
} else if (IcebergActions.DELETE_ORPHANS.equals(action)) {
return new OrphanFilesCleaningProcess(tableRuntime, localEngine);
+ } else if (IcebergActions.CLEAN_DANGLING_DELETE.equals(action)) {
+ return new DanglingDeleteFilesCleaningProcess(tableRuntime, localEngine);
+ } else if (IcebergActions.EXPIRE_DATA.equals(action)) {
+ return new DataExpiringProcess(tableRuntime, localEngine);
}
throw new RecoverProcessFailedException(
@@ -163,6 +176,12 @@ public class IcebergProcessFactory implements
ProcessFactory {
this.actions.put(
IcebergActions.CLEAN_DANGLING_DELETE,
ProcessTriggerStrategy.triggerAtFixRate(interval));
}
+
+ if (configs.getBoolean(DATA_EXPIRE_ENABLED)) {
+ Duration interval = configs.getDuration(DATA_EXPIRE_INTERVAL);
+ this.actions.put(
+ IcebergActions.EXPIRE_DATA,
ProcessTriggerStrategy.triggerAtFixRate(interval));
+ }
}
private Optional<TableProcess> triggerExpireSnapshot(TableRuntime
tableRuntime) {
@@ -213,6 +232,22 @@ public class IcebergProcessFactory implements
ProcessFactory {
return Optional.of(new DanglingDeleteFilesCleaningProcess(tableRuntime,
localEngine));
}
+ private Optional<TableProcess> triggerDataExpiring(TableRuntime
tableRuntime) {
+ if (localEngine == null
+ ||
!tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled()) {
+ return Optional.empty();
+ }
+
+ long lastExecuteTime =
+
tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastDataExpiringTime();
+ ProcessTriggerStrategy strategy = actions.get(IcebergActions.EXPIRE_DATA);
+ if (System.currentTimeMillis() - lastExecuteTime <
strategy.getTriggerInterval().toMillis()) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new DataExpiringProcess(tableRuntime, localEngine));
+ }
+
@Override
public void close() {}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
deleted file mode 100644
index 05d72761f..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.AmoroTable;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.maintainer.TableMaintainer;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.server.table.cleanup.CleanupOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class DataExpiringExecutor extends PeriodicTableScheduler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(DataExpiringExecutor.class);
-
- private final Duration interval;
-
- protected DataExpiringExecutor(TableService tableService, int poolSize,
Duration interval) {
- super(tableService, poolSize);
- this.interval = interval;
- }
-
- @Override
- protected long getNextExecutingTime(TableRuntime tableRuntime) {
- return interval.toMillis();
- }
-
- @Override
- protected boolean shouldExecute(Long lastCleanupEndTime) {
- return System.currentTimeMillis() - lastCleanupEndTime >=
interval.toMillis();
- }
-
- @Override
- protected CleanupOperation getCleanupOperation() {
- return CleanupOperation.DATA_EXPIRING;
- }
-
- @Override
- protected boolean enabled(TableRuntime tableRuntime) {
- return
tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled();
- }
-
- @Override
- public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
- scheduleIfNecessary(tableRuntime, getStartDelay());
- }
-
- @Override
- protected long getExecutorDelay() {
- return ThreadLocalRandom.current().nextLong(interval.toMillis());
- }
-
- @Override
- protected void execute(TableRuntime tableRuntime) {
- try {
- AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
- tableMaintainer.expireData();
- } catch (Throwable t) {
- LOG.error("unexpected expire error of table {} ",
tableRuntime.getTableIdentifier(), t);
- }
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 4bb45f73a..ec402a90d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -33,7 +33,6 @@ public class InlineTableExecutors {
private ProcessDataExpiringExecutor processDataExpiringExecutor;
private HiveCommitSyncExecutor hiveCommitSyncExecutor;
private TagsAutoCreatingExecutor tagsAutoCreatingExecutor;
- private DataExpiringExecutor dataExpiringExecutor;
public static InlineTableExecutors getInstance() {
return instance;
@@ -79,13 +78,6 @@ public class InlineTableExecutors {
conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT),
conf.get(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL).toMillis());
}
- if (conf.getBoolean(AmoroManagementConf.DATA_EXPIRATION_ENABLED)) {
- this.dataExpiringExecutor =
- new DataExpiringExecutor(
- tableService,
-
conf.getInteger(AmoroManagementConf.DATA_EXPIRATION_THREAD_COUNT),
- conf.get(AmoroManagementConf.DATA_EXPIRATION_INTERVAL));
- }
}
public TableRuntimeRefreshExecutor getTableRefreshingExecutor() {
@@ -111,8 +103,4 @@ public class InlineTableExecutors {
public TagsAutoCreatingExecutor getTagsAutoCreatingExecutor() {
return tagsAutoCreatingExecutor;
}
-
- public DataExpiringExecutor getDataExpiringExecutor() {
- return dataExpiringExecutor;
- }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 1c6dd25f8..7fd9c075a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -342,8 +342,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
public long getLastCleanTime(CleanupOperation operation) {
TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
switch (operation) {
- case DATA_EXPIRING:
- return state.getLastDataExpiringTime();
case SNAPSHOTS_EXPIRING:
return state.getLastSnapshotsExpiringTime();
default:
@@ -358,9 +356,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
CLEANUP_STATE_KEY,
state -> {
switch (operation) {
- case DATA_EXPIRING:
- state.setLastDataExpiringTime(time);
- break;
case SNAPSHOTS_EXPIRING:
state.setLastSnapshotsExpiringTime(time);
break;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
index 478d607f5..958329081 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java
@@ -20,7 +20,6 @@ package org.apache.amoro.server.table.cleanup;
/** Table cleanup operation enum. Defines different operation types for table
cleanup tasks. */
public enum CleanupOperation {
- DATA_EXPIRING,
SNAPSHOTS_EXPIRING,
// NONE indicates operation types where no cleanup process records are
// saved in the table_runtime_state table.
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
index 21f112584..8e337b1b4 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -47,8 +47,9 @@ public class TableRuntimeCleanupState {
return lastDataExpiringTime;
}
- public void setLastDataExpiringTime(long lastDataExpiringTime) {
+ public TableRuntimeCleanupState setLastDataExpiringTime(long
lastDataExpiringTime) {
this.lastDataExpiringTime = lastDataExpiringTime;
+ return this;
}
public long getLastSnapshotsExpiringTime() {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 9c429803f..362165b45 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -25,6 +25,7 @@ import org.apache.amoro.Action;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.LocalExecutionEngine;
import org.apache.amoro.process.ProcessTriggerStrategy;
@@ -52,6 +53,7 @@ public class TestIcebergProcessFactory {
"clean-orphan-files", IcebergActions.DELETE_ORPHANS,
Duration.ofHours(24));
assertSupportedAction(
"clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE,
Duration.ofHours(24));
+ assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA,
Duration.ofHours(24));
}
@Test
@@ -65,6 +67,7 @@ public class TestIcebergProcessFactory {
IcebergActions.CLEAN_DANGLING_DELETE,
DanglingDeleteFilesCleaningProcess.class,
0);
+ assertTriggerWhenDue("expire-data", IcebergActions.EXPIRE_DATA,
DataExpiringProcess.class, 0);
}
@Test
@@ -77,6 +80,7 @@ public class TestIcebergProcessFactory {
"clean-dangling-delete-files",
IcebergActions.CLEAN_DANGLING_DELETE,
System.currentTimeMillis());
+ assertTriggerNotDue("expire-data", IcebergActions.EXPIRE_DATA,
System.currentTimeMillis());
}
@Test
@@ -85,6 +89,7 @@ public class TestIcebergProcessFactory {
assertTriggerDisabled("clean-orphan-files", IcebergActions.DELETE_ORPHANS,
false, 0);
assertTriggerDisabled(
"clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE,
false, 0);
+ assertTriggerDisabled("expire-data", IcebergActions.EXPIRE_DATA, false, 0);
}
@Test
@@ -99,6 +104,19 @@ public class TestIcebergProcessFactory {
"clean-orphan-files", IcebergActions.DELETE_ORPHANS,
OrphanFilesCleaningProcess.class);
}
+ @Test
+ public void testRecoverCleanDanglingDeleteProcess() {
+ assertRecover(
+ "clean-dangling-delete-files",
+ IcebergActions.CLEAN_DANGLING_DELETE,
+ DanglingDeleteFilesCleaningProcess.class);
+ }
+
+ @Test
+ public void testRecoverDataExpiringProcess() {
+ assertRecover("expire-data", IcebergActions.EXPIRE_DATA,
DataExpiringProcess.class);
+ }
+
@Test
public void testRecoverUnsupportedActionThrows() {
IcebergProcessFactory factory = openedFactory("expire-snapshots");
@@ -239,6 +257,8 @@ public class TestIcebergProcessFactory {
tableConfiguration.setCleanOrphanEnabled(enabled);
} else if ("clean-dangling-delete-files".equals(configKey)) {
tableConfiguration.setDeleteDanglingDeleteFilesEnabled(enabled);
+ } else if ("expire-data".equals(configKey)) {
+ tableConfiguration.setExpiringDataConfig(new
DataExpirationConfig().setEnabled(enabled));
}
TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState();
@@ -248,6 +268,8 @@ public class TestIcebergProcessFactory {
cleanupState.setLastOrphanFilesCleanTime(lastTime);
} else if ("clean-dangling-delete-files".equals(configKey)) {
cleanupState.setLastDanglingDeleteFilesCleanTime(lastTime);
+ } else if ("expire-data".equals(configKey)) {
+ cleanupState.setLastDataExpiringTime(lastTime);
}
TableRuntime runtime = mock(TableRuntime.class);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
index f1c13a666..70aed2fce 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java
@@ -31,7 +31,6 @@ class PeriodicTableSchedulerTestBase extends
PeriodicTableScheduler {
private final CleanupOperation cleanupOperation;
private final boolean enabled;
private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; //
1 hour
- private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour
public PeriodicTableSchedulerTestBase(
TableService tableService, CleanupOperation cleanupOperation, boolean
enabled) {
@@ -71,8 +70,6 @@ class PeriodicTableSchedulerTestBase extends
PeriodicTableScheduler {
switch (cleanupOperation) {
case SNAPSHOTS_EXPIRING:
return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL;
- case DATA_EXPIRING:
- return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL;
default:
return true;
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
index 7bee81602..40939505d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -165,8 +165,7 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
*/
@Test
public void testShouldExecuteTaskWithNoPreviousCleanup() {
- List<CleanupOperation> operations =
- Arrays.asList(CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
+ List<CleanupOperation> operations =
Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING);
for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
@@ -186,8 +185,7 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
/** Test should not execute task with recent cleanup */
@Test
public void testShouldNotExecuteTaskWithRecentCleanup() {
- List<CleanupOperation> operations =
- Arrays.asList(CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
+ List<CleanupOperation> operations =
Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING);
for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
@@ -212,8 +210,7 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
/** Test should execute task with old cleanup */
@Test
public void testShouldExecuteTaskWithOldCleanup() {
- List<CleanupOperation> operations =
- Arrays.asList(CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);
+ List<CleanupOperation> operations =
Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING);
for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
diff --git
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
index 31a9331ee..e85f96085 100644
---
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
+++
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -48,6 +48,7 @@ public class TestLocalExecutionEngine {
assertCustomPoolByTag("snapshots-expiring");
assertCustomPoolByTag("orphan-files-cleaning");
assertCustomPoolByTag("clean-dangling-delete-files");
+ assertCustomPoolByTag("expire-data");
}
private void assertCustomPoolByTag(String tag) throws Exception {
@@ -156,6 +157,7 @@ public class TestLocalExecutionEngine {
properties.put("pool.snapshots-expiring.thread-count", "1");
properties.put("pool.orphan-files-cleaning.thread-count", "1");
properties.put("pool.clean-dangling-delete-files.thread-count", "1");
+ properties.put("pool.expire-data.thread-count", "1");
properties.put("process.status.ttl", ttl);
localEngine.open(properties);
return localEngine;
diff --git a/charts/amoro/templates/amoro-configmap.yaml
b/charts/amoro/templates/amoro-configmap.yaml
index b11d2bed3..0bfbfaf91 100644
--- a/charts/amoro/templates/amoro-configmap.yaml
+++ b/charts/amoro/templates/amoro-configmap.yaml
@@ -107,11 +107,6 @@ data:
enabled: false
thread-count: 10
- data-expiration:
- enabled: true
- thread-count: 10
- interval: 1d
-
auto-create-tags:
enabled: true
thread-count: 3
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index e4e9399f2..866eda219 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -116,11 +116,6 @@ ams:
enabled: false
thread-count: 10
- data-expiration:
- enabled: true
- thread-count: 10
- interval: 1d
-
auto-create-tags:
enabled: true
thread-count: 3
diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 5eefcd798..1dfde29eb 100755
--- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -23,4 +23,5 @@ execute-engines:
pool.default.thread-count: 10
pool.snapshots-expiring.thread-count: 10
pool.orphan-files-cleaning.thread-count: 10
- pool.clean-dangling-delete-files.thread-count: 10
\ No newline at end of file
+ pool.clean-dangling-delete-files.thread-count: 10
+ pool.expire-data.thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index ff7719384..2adb757c3 100755
--- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -26,3 +26,5 @@ process-factories:
clean-orphan-files.interval: "1d"
clean-dangling-delete-files.enabled: "true"
clean-dangling-delete-files.interval: "1d"
+ expire-data.enabled: "true"
+ expire-data.interval: "1d"
diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md
index 216b1a3e4..e99fb984e 100644
--- a/docs/admin-guides/deployment.md
+++ b/docs/admin-guides/deployment.md
@@ -276,6 +276,8 @@ process-factories:
clean-orphan-files.interval: "1d" # interval for cleaning orphan
files
clean-dangling-delete-files.enabled: "true" # enable dangling delete
files cleaning
clean-dangling-delete-files.interval: "1d" # interval for cleaning
dangling delete files
+ expire-data.enabled: "true" # enable data expiration
+ expire-data.interval: "1d" # interval for data expiration
```
{{< hint info >}}
@@ -307,6 +309,7 @@ execute-engines:
pool.snapshots-expiring.thread-count: 10 # thread pool for
snapshot expiration
pool.orphan-files-cleaning.thread-count: 10 # thread pool for orphan
file cleaning
pool.clean-dangling-delete-files.thread-count: 10 # thread pool for
dangling delete files cleaning
+ pool.expire-data.thread-count: 10 # thread pool for data
expiration
process.status.ttl: 4h # TTL for process status
cache
```
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 51428b605..0a12a0038 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -49,9 +49,6 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| auto-create-tags.thread-count | 3 | The number of threads used for creating
tags. |
| blocker.timeout | 1 min | Session timeout. Default unit is milliseconds if
not specified. |
| catalog-meta-cache.expiration-interval | 1 min | TTL for catalog metadata. |
-| data-expiration.enabled | true | Enable data expiration |
-| data-expiration.interval | 1 d | Execute interval for data expiration |
-| data-expiration.thread-count | 10 | The number of threads used for data
expiring |
| database.auto-create-tables | true | Auto init table schema when started |
| database.connection-pool-max-idle | 16 | Max idle connect count of database
connect pool. |
| database.connection-pool-max-total | 20 | Max connect count of database
connect pool. |
diff --git a/docs/user-guides/using-tables.md b/docs/user-guides/using-tables.md
index 6410c2247..042bfb503 100644
--- a/docs/user-guides/using-tables.md
+++ b/docs/user-guides/using-tables.md
@@ -160,7 +160,7 @@ ALTER TABLE test_db.test_log_store set tblproperties (
## Configure data expiration
Amoro can periodically clean data based on the table's expiration policy,
which includes properties such as whether to enable expiration, retention
duration, expiration level, and the selection of the field for expiration.
-it's also necessary for AMS to have the data expiration thread enabled. You
can enable the 'data-expiration' property in the configuration file
+it's also necessary for AMS to have the data expiration process enabled. You
can enable the 'expire-data' property in the process-factories plugin
configuration file
### Enable or disable data expiration