This is an automated email from the ASF dual-hosted git repository. czy006 pushed a commit to branch 0.9.x in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 1f1d1fd0528a9e25ba22cf3342cb08bf2e6f71ec Author: WenLingzhang <[email protected]> AuthorDate: Tue May 19 15:36:26 2026 +0800 [AMORO-4215] Refactor data expiring via ProcessFactory plugin (#4218) * Refactor DataExpiringExecutor via ProcessFactory plugin # Conflicts: # amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java # amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java # Conflicts: # amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java # amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java # amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java # amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java # amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java # amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java # amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java # amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java # dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml # dist/src/main/amoro-bin/conf/plugins/process-factories.yaml # docs/admin-guides/deployment.md # docs/configuration/ams-config.md * Add recover logic * Fix logging message formatting in DataExpiringProcess * Fix data expiration property name in documentation Corrected the property name from 'expire-data' to 'data-expiration' in the documentation. * Update using-tables.md --------- Co-authored-by: 张文领 <[email protected]> (cherry picked from commit 38ef38134d54a7381a351b4f17cba42a6c5356ac) --- .../apache/amoro/server/AmoroManagementConf.java | 18 ----- .../apache/amoro/server/AmoroServiceContainer.java | 1 - .../process/iceberg/DataExpiringProcess.java | 80 ++++++++++++++++++++ .../process/iceberg/IcebergProcessFactory.java | 41 ++++++++++- .../scheduler/inline/DataExpiringExecutor.java | 86 ---------------------- .../scheduler/inline/InlineTableExecutors.java | 12 --- .../amoro/server/table/DefaultTableRuntime.java | 5 -- .../server/table/cleanup/CleanupOperation.java | 1 - .../table/cleanup/TableRuntimeCleanupState.java | 3 +- .../process/iceberg/TestIcebergProcessFactory.java | 22 ++++++ .../inline/PeriodicTableSchedulerTestBase.java | 3 - .../inline/TestPeriodicTableSchedulerCleanup.java | 9 +-- .../amoro/process/TestLocalExecutionEngine.java | 2 + charts/amoro/templates/amoro-configmap.yaml | 5 -- dist/src/main/amoro-bin/conf/config.yaml | 5 -- .../amoro-bin/conf/plugins/execute-engines.yaml | 3 +- .../amoro-bin/conf/plugins/process-factories.yaml | 2 + docs/admin-guides/deployment.md | 3 + docs/configuration/ams-config.md | 3 - docs/user-guides/using-tables.md | 2 +- 20 files changed, 155 insertions(+), 151 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 759b76700..fe72b87b6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -663,24 +663,6 @@ public class AmoroManagementConf { .withDescription( "Comma-separated list of sensitive conf keys used to desensitize related value."); - /** configs of data expiration */ - public static final ConfigOption<Boolean> DATA_EXPIRATION_ENABLED = - ConfigOptions.key("data-expiration.enabled") - .booleanType() - .defaultValue(true) - .withDescription("Enable data expiration"); - - public static final ConfigOption<Integer> DATA_EXPIRATION_THREAD_COUNT = - ConfigOptions.key("data-expiration.thread-count") - .intType() - .defaultValue(10) - .withDescription("The number of threads used for data expiring"); - public static final ConfigOption<Duration> DATA_EXPIRATION_INTERVAL = - ConfigOptions.key("data-expiration.interval") - .durationType() - .defaultValue(Duration.ofDays(1)) - .withDescription("Execute interval for data expiration"); - public static final String SYSTEM_CONFIG = "ams"; public static final String CATALOG_CORE_SITE = "core-site"; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 27615ab31..7956fd0bc 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -289,7 +289,6 @@ public class AmoroServiceContainer { InlineTableExecutors.getInstance().setup(tableService, serviceConfig); addHandlerChain(optimizingService.getTableRuntimeHandler()); addHandlerChain(processService.getTableHandlerChain()); - addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DataExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DataExpiringProcess.java new file mode 100644 index 000000000..d3f9c0cc1 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/DataExpiringProcess.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import org.apache.amoro.Action; +import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.maintainer.TableMaintainer; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.LocalProcess; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.optimizing.maintainer.TableMaintainerFactory; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** Local table process for expiring Iceberg data. */ +public class DataExpiringProcess extends TableProcess implements LocalProcess { + + private static final Logger LOG = LoggerFactory.getLogger(DataExpiringProcess.class); + + public DataExpiringProcess(TableRuntime tableRuntime, ExecuteEngine engine) { + super(tableRuntime, engine); + } + + @Override + public String tag() { + return getAction().getName().toLowerCase(); + } + + @Override + public void run() { + try { + AmoroTable<?> amoroTable = tableRuntime.loadTable(); + TableMaintainer tableMaintainer = TableMaintainerFactory.create(amoroTable, tableRuntime); + tableMaintainer.expireData(); + tableRuntime.updateState( + DefaultTableRuntime.CLEANUP_STATE_KEY, + cleanUp -> cleanUp.setLastDataExpiringTime(System.currentTimeMillis())); + } catch (Throwable t) { + LOG.error("unexpected expire data error of table {}", tableRuntime.getTableIdentifier(), t); + throw new RuntimeException(t); + } + } + + @Override + public Action getAction() { + return IcebergActions.EXPIRE_DATA; + } + + @Override + public Map<String, String> getProcessParameters() { + return Maps.newHashMap(); + } + + @Override + public Map<String, String> getSummary() { + return Maps.newHashMap(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index db3ed8c74..be94f9a55 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -73,6 +73,12 @@ public class IcebergProcessFactory implements ProcessFactory { .durationType() .defaultValue(Duration.ofDays(1)); + public static final ConfigOption<Boolean> DATA_EXPIRE_ENABLED = + ConfigOptions.key("expire-data.enabled").booleanType().defaultValue(true); + + public static final ConfigOption<Duration> DATA_EXPIRE_INTERVAL = + ConfigOptions.key("expire-data.interval").durationType().defaultValue(Duration.ofDays(1)); + private ExecuteEngine localEngine; private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap(); private final List<TableFormat> formats = @@ -111,6 +117,8 @@ public class IcebergProcessFactory implements ProcessFactory { return triggerCleanOrphans(tableRuntime); } else if (IcebergActions.CLEAN_DANGLING_DELETE.equals(action)) { return triggerCleanDanglingDelete(tableRuntime); + } else if (IcebergActions.EXPIRE_DATA.equals(action)) { + return triggerDataExpiring(tableRuntime); } return Optional.empty(); @@ -127,13 +135,18 @@ public class IcebergProcessFactory implements ProcessFactory { + action); } - // SnapshotsExpiringProcess and OrphanFilesCleaningProcess are stateless, idempotent - // one-shot local maintenance tasks (no checkpoint), so recovery simply rebuilds the - // process so it can run again. The store/processId/tracking is owned by ProcessService. + // SnapshotsExpiringProcess, OrphanFilesCleaningProcess, DanglingDeleteFilesCleaningProcess + // and DataExpiringProcess are stateless, idempotent one-shot local maintenance tasks + // (no checkpoint), so recovery simply rebuilds the process so it can run again. + // The store/processId/tracking is owned by ProcessService. if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { return new SnapshotsExpiringProcess(tableRuntime, localEngine); } else if (IcebergActions.DELETE_ORPHANS.equals(action)) { return new OrphanFilesCleaningProcess(tableRuntime, localEngine); + } else if (IcebergActions.CLEAN_DANGLING_DELETE.equals(action)) { + return new DanglingDeleteFilesCleaningProcess(tableRuntime, localEngine); + } else if (IcebergActions.EXPIRE_DATA.equals(action)) { + return new DataExpiringProcess(tableRuntime, localEngine); } throw new RecoverProcessFailedException( @@ -163,6 +176,12 @@ public class IcebergProcessFactory implements ProcessFactory { this.actions.put( IcebergActions.CLEAN_DANGLING_DELETE, ProcessTriggerStrategy.triggerAtFixRate(interval)); } + + if (configs.getBoolean(DATA_EXPIRE_ENABLED)) { + Duration interval = configs.getDuration(DATA_EXPIRE_INTERVAL); + this.actions.put( + IcebergActions.EXPIRE_DATA, ProcessTriggerStrategy.triggerAtFixRate(interval)); + } } private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) { @@ -213,6 +232,22 @@ public class IcebergProcessFactory implements ProcessFactory { return Optional.of(new DanglingDeleteFilesCleaningProcess(tableRuntime, localEngine)); } + private Optional<TableProcess> triggerDataExpiring(TableRuntime tableRuntime) { + if (localEngine == null + || !tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled()) { + return Optional.empty(); + } + + long lastExecuteTime = + tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastDataExpiringTime(); + ProcessTriggerStrategy strategy = actions.get(IcebergActions.EXPIRE_DATA); + if (System.currentTimeMillis() - lastExecuteTime < strategy.getTriggerInterval().toMillis()) { + return Optional.empty(); + } + + return Optional.of(new DataExpiringProcess(tableRuntime, localEngine)); + } + @Override public void close() {} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java deleted file mode 100644 index 05d72761f..000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.scheduler.inline; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.config.TableConfiguration; -import org.apache.amoro.maintainer.TableMaintainer; -import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; -import org.apache.amoro.server.scheduler.PeriodicTableScheduler; -import org.apache.amoro.server.table.TableService; -import org.apache.amoro.server.table.cleanup.CleanupOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - -public class DataExpiringExecutor extends PeriodicTableScheduler { - - private static final Logger LOG = LoggerFactory.getLogger(DataExpiringExecutor.class); - - private final Duration interval; - - protected DataExpiringExecutor(TableService tableService, int poolSize, Duration interval) { - super(tableService, poolSize); - this.interval = interval; - } - - @Override - protected long getNextExecutingTime(TableRuntime tableRuntime) { - return interval.toMillis(); - } - - @Override - protected boolean shouldExecute(Long lastCleanupEndTime) { - return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis(); - } - - @Override - protected CleanupOperation getCleanupOperation() { - return CleanupOperation.DATA_EXPIRING; - } - - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled(); - } - - @Override - public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - scheduleIfNecessary(tableRuntime, getStartDelay()); - } - - @Override - protected long getExecutorDelay() { - return ThreadLocalRandom.current().nextLong(interval.toMillis()); - } - - @Override - protected void execute(TableRuntime tableRuntime) { - try { - AmoroTable<?> amoroTable = loadTable(tableRuntime); - TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, tableRuntime); - tableMaintainer.expireData(); - } catch (Throwable t) { - LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index 4bb45f73a..ec402a90d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -33,7 +33,6 @@ public class InlineTableExecutors { private ProcessDataExpiringExecutor processDataExpiringExecutor; private HiveCommitSyncExecutor hiveCommitSyncExecutor; private TagsAutoCreatingExecutor tagsAutoCreatingExecutor; - private DataExpiringExecutor dataExpiringExecutor; public static InlineTableExecutors getInstance() { return instance; @@ -79,13 +78,6 @@ public class InlineTableExecutors { conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT), conf.get(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL).toMillis()); } - if (conf.getBoolean(AmoroManagementConf.DATA_EXPIRATION_ENABLED)) { - this.dataExpiringExecutor = - new DataExpiringExecutor( - tableService, - conf.getInteger(AmoroManagementConf.DATA_EXPIRATION_THREAD_COUNT), - conf.get(AmoroManagementConf.DATA_EXPIRATION_INTERVAL)); - } } public TableRuntimeRefreshExecutor getTableRefreshingExecutor() { @@ -111,8 +103,4 @@ public class InlineTableExecutors { public TagsAutoCreatingExecutor getTagsAutoCreatingExecutor() { return tagsAutoCreatingExecutor; } - - public DataExpiringExecutor getDataExpiringExecutor() { - return dataExpiringExecutor; - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index 1c6dd25f8..7fd9c075a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -342,8 +342,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime { public long getLastCleanTime(CleanupOperation operation) { TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY); switch (operation) { - case DATA_EXPIRING: - return state.getLastDataExpiringTime(); case SNAPSHOTS_EXPIRING: return state.getLastSnapshotsExpiringTime(); default: @@ -358,9 +356,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime { CLEANUP_STATE_KEY, state -> { switch (operation) { - case DATA_EXPIRING: - state.setLastDataExpiringTime(time); - break; case SNAPSHOTS_EXPIRING: state.setLastSnapshotsExpiringTime(time); break; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java index 478d607f5..958329081 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java @@ -20,7 +20,6 @@ package org.apache.amoro.server.table.cleanup; /** Table cleanup operation enum. Defines different operation types for table cleanup tasks. */ public enum CleanupOperation { - DATA_EXPIRING, SNAPSHOTS_EXPIRING, // NONE indicates operation types where no cleanup process records are // saved in the table_runtime_state table. diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java index 21f112584..8e337b1b4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java @@ -47,8 +47,9 @@ public class TableRuntimeCleanupState { return lastDataExpiringTime; } - public void setLastDataExpiringTime(long lastDataExpiringTime) { + public TableRuntimeCleanupState setLastDataExpiringTime(long lastDataExpiringTime) { this.lastDataExpiringTime = lastDataExpiringTime; + return this; } public long getLastSnapshotsExpiringTime() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java index 9c429803f..362165b45 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -25,6 +25,7 @@ import org.apache.amoro.Action; import org.apache.amoro.IcebergActions; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.DataExpirationConfig; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.process.LocalExecutionEngine; import org.apache.amoro.process.ProcessTriggerStrategy; @@ -52,6 +53,7 @@ public class TestIcebergProcessFactory { "clean-orphan-files", IcebergActions.DELETE_ORPHANS, Duration.ofHours(24)); assertSupportedAction( "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, Duration.ofHours(24)); + assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA, Duration.ofHours(24)); } @Test @@ -65,6 +67,7 @@ public class TestIcebergProcessFactory { IcebergActions.CLEAN_DANGLING_DELETE, DanglingDeleteFilesCleaningProcess.class, 0); + assertTriggerWhenDue("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class, 0); } @Test @@ -77,6 +80,7 @@ public class TestIcebergProcessFactory { "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, System.currentTimeMillis()); + assertTriggerNotDue("expire-data", IcebergActions.EXPIRE_DATA, System.currentTimeMillis()); } @Test @@ -85,6 +89,7 @@ public class TestIcebergProcessFactory { assertTriggerDisabled("clean-orphan-files", IcebergActions.DELETE_ORPHANS, false, 0); assertTriggerDisabled( "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, false, 0); + assertTriggerDisabled("expire-data", IcebergActions.EXPIRE_DATA, false, 0); } @Test @@ -99,6 +104,19 @@ public class TestIcebergProcessFactory { "clean-orphan-files", IcebergActions.DELETE_ORPHANS, OrphanFilesCleaningProcess.class); } + @Test + public void testRecoverCleanDanglingDeleteProcess() { + assertRecover( + "clean-dangling-delete-files", + IcebergActions.CLEAN_DANGLING_DELETE, + DanglingDeleteFilesCleaningProcess.class); + } + + @Test + public void testRecoverDataExpiringProcess() { + assertRecover("expire-data", IcebergActions.EXPIRE_DATA, DataExpiringProcess.class); + } + @Test public void testRecoverUnsupportedActionThrows() { IcebergProcessFactory factory = openedFactory("expire-snapshots"); @@ -239,6 +257,8 @@ public class TestIcebergProcessFactory { tableConfiguration.setCleanOrphanEnabled(enabled); } else if ("clean-dangling-delete-files".equals(configKey)) { tableConfiguration.setDeleteDanglingDeleteFilesEnabled(enabled); + } else if ("expire-data".equals(configKey)) { + tableConfiguration.setExpiringDataConfig(new DataExpirationConfig().setEnabled(enabled)); } TableRuntimeCleanupState cleanupState = new TableRuntimeCleanupState(); @@ -248,6 +268,8 @@ public class TestIcebergProcessFactory { cleanupState.setLastOrphanFilesCleanTime(lastTime); } else if ("clean-dangling-delete-files".equals(configKey)) { cleanupState.setLastDanglingDeleteFilesCleanTime(lastTime); + } else if ("expire-data".equals(configKey)) { + cleanupState.setLastDataExpiringTime(lastTime); } TableRuntime runtime = mock(TableRuntime.class); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java index f1c13a666..70aed2fce 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java @@ -31,7 +31,6 @@ class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { private final CleanupOperation cleanupOperation; private final boolean enabled; private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour - private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour public PeriodicTableSchedulerTestBase( TableService tableService, CleanupOperation cleanupOperation, boolean enabled) { @@ -71,8 +70,6 @@ class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { switch (cleanupOperation) { case SNAPSHOTS_EXPIRING: return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL; - case DATA_EXPIRING: - return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL; default: return true; } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java index 7bee81602..40939505d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java @@ -165,8 +165,7 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { */ @Test public void testShouldExecuteTaskWithNoPreviousCleanup() { - List<CleanupOperation> operations = - Arrays.asList(CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); + List<CleanupOperation> operations = Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING); for (CleanupOperation operation : operations) { List<Long> testTableIds = Collections.singletonList(1L); @@ -186,8 +185,7 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { /** Test should not execute task with recent cleanup */ @Test public void testShouldNotExecuteTaskWithRecentCleanup() { - List<CleanupOperation> operations = - Arrays.asList(CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); + List<CleanupOperation> operations = Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING); for (CleanupOperation operation : operations) { List<Long> testTableIds = Collections.singletonList(1L); @@ -212,8 +210,7 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase { /** Test should execute task with old cleanup */ @Test public void testShouldExecuteTaskWithOldCleanup() { - List<CleanupOperation> operations = - Arrays.asList(CleanupOperation.DATA_EXPIRING, CleanupOperation.SNAPSHOTS_EXPIRING); + List<CleanupOperation> operations = Arrays.asList(CleanupOperation.SNAPSHOTS_EXPIRING); for (CleanupOperation operation : operations) { List<Long> testTableIds = Collections.singletonList(1L); diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java index 31a9331ee..e85f96085 100644 --- a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java @@ -48,6 +48,7 @@ public class TestLocalExecutionEngine { assertCustomPoolByTag("snapshots-expiring"); assertCustomPoolByTag("orphan-files-cleaning"); assertCustomPoolByTag("clean-dangling-delete-files"); + assertCustomPoolByTag("expire-data"); } private void assertCustomPoolByTag(String tag) throws Exception { @@ -156,6 +157,7 @@ public class TestLocalExecutionEngine { properties.put("pool.snapshots-expiring.thread-count", "1"); properties.put("pool.orphan-files-cleaning.thread-count", "1"); properties.put("pool.clean-dangling-delete-files.thread-count", "1"); + properties.put("pool.expire-data.thread-count", "1"); properties.put("process.status.ttl", ttl); localEngine.open(properties); return localEngine; diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index b11d2bed3..0bfbfaf91 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -107,11 +107,6 @@ data: enabled: false thread-count: 10 - data-expiration: - enabled: true - thread-count: 10 - interval: 1d - auto-create-tags: enabled: true thread-count: 3 diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index e4e9399f2..866eda219 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -116,11 +116,6 @@ ams: enabled: false thread-count: 10 - data-expiration: - enabled: true - thread-count: 10 - interval: 1d - auto-create-tags: enabled: true thread-count: 3 diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml index 5eefcd798..1dfde29eb 100755 --- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -23,4 +23,5 @@ execute-engines: pool.default.thread-count: 10 pool.snapshots-expiring.thread-count: 10 pool.orphan-files-cleaning.thread-count: 10 - pool.clean-dangling-delete-files.thread-count: 10 \ No newline at end of file + pool.clean-dangling-delete-files.thread-count: 10 + pool.expire-data.thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml index ff7719384..2adb757c3 100755 --- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -26,3 +26,5 @@ process-factories: clean-orphan-files.interval: "1d" clean-dangling-delete-files.enabled: "true" clean-dangling-delete-files.interval: "1d" + expire-data.enabled: "true" + expire-data.interval: "1d" diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md index 216b1a3e4..e99fb984e 100644 --- a/docs/admin-guides/deployment.md +++ b/docs/admin-guides/deployment.md @@ -276,6 +276,8 @@ process-factories: clean-orphan-files.interval: "1d" # interval for cleaning orphan files clean-dangling-delete-files.enabled: "true" # enable dangling delete files cleaning clean-dangling-delete-files.interval: "1d" # interval for cleaning dangling delete files + expire-data.enabled: "true" # enable data expiration + expire-data.interval: "1d" # interval for data expiration ``` {{< hint info >}} @@ -307,6 +309,7 @@ execute-engines: pool.snapshots-expiring.thread-count: 10 # thread pool for snapshot expiration pool.orphan-files-cleaning.thread-count: 10 # thread pool for orphan file cleaning pool.clean-dangling-delete-files.thread-count: 10 # thread pool for dangling delete files cleaning + pool.expire-data.thread-count: 10 # thread pool for data expiration process.status.ttl: 4h # TTL for process status cache ``` diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index 51428b605..0a12a0038 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -49,9 +49,6 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | auto-create-tags.thread-count | 3 | The number of threads used for creating tags. | | blocker.timeout | 1 min | Session timeout. Default unit is milliseconds if not specified. | | catalog-meta-cache.expiration-interval | 1 min | TTL for catalog metadata. | -| data-expiration.enabled | true | Enable data expiration | -| data-expiration.interval | 1 d | Execute interval for data expiration | -| data-expiration.thread-count | 10 | The number of threads used for data expiring | | database.auto-create-tables | true | Auto init table schema when started | | database.connection-pool-max-idle | 16 | Max idle connect count of database connect pool. | | database.connection-pool-max-total | 20 | Max connect count of database connect pool. | diff --git a/docs/user-guides/using-tables.md b/docs/user-guides/using-tables.md index 6410c2247..042bfb503 100644 --- a/docs/user-guides/using-tables.md +++ b/docs/user-guides/using-tables.md @@ -160,7 +160,7 @@ ALTER TABLE test_db.test_log_store set tblproperties ( ## Configure data expiration Amoro can periodically clean data based on the table's expiration policy, which includes properties such as whether to enable expiration, retention duration, expiration level, and the selection of the field for expiration. -it's also necessary for AMS to have the data expiration thread enabled. You can enable the 'data-expiration' property in the configuration file +it's also necessary for AMS to have the data expiration process enabled. You can enable the 'expire-data' property in the process-factories plugin configuration file ### Enable or disable data expiration
