This is an automated email from the ASF dual-hosted git repository. zhoujinsong pushed a commit to branch revert-4247-AMORO-ProcessDataExpiringExecutor-refactor-dev in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 2440dad6e75a616b9c5d672dd0e6877e8de3228c Author: ZhouJinsong <[email protected]> AuthorDate: Fri Jun 12 15:57:43 2026 +0800 Revert "[AMORO-4246] Refactor process data expiring via ProcessFactory plugin…" This reverts commit acf8ef47a3d9305ef217399a9aeb8c6d312bbe75. --- .../apache/amoro/server/AmoroManagementConf.java | 50 +++++++++++++++ .../apache/amoro/server/AmoroServiceContainer.java | 1 + .../process/iceberg/IcebergProcessFactory.java | 60 ------------------ .../scheduler/inline/InlineTableExecutors.java | 24 ++++++++ .../inline/ProcessDataExpiringExecutor.java} | 72 ++++++++++------------ .../org/apache/amoro/server/AmsEnvironment.java | 2 + .../amoro/server/TestAmoroManagementConf.java | 26 ++++++++ .../process/iceberg/TestIcebergProcessFactory.java | 40 ++++-------- .../inline/TestConfigurableIntervalExecutors.java | 56 +++++++++++++++++ .../inline/TestProcessDataExpiringExecutor.java} | 38 ++++++------ .../main/java/org/apache/amoro/IcebergActions.java | 1 - .../amoro/process/TestLocalExecutionEngine.java | 2 - dist/src/main/amoro-bin/conf/config.yaml | 5 ++ .../amoro-bin/conf/plugins/execute-engines.yaml | 3 +- .../amoro-bin/conf/plugins/process-factories.yaml | 3 - docs/admin-guides/deployment.md | 4 -- docs/configuration/ams-config.md | 6 ++ 17 files changed, 233 insertions(+), 160 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 2ac6f5cfb..f38f6f5ac 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -395,6 +395,38 @@ public class AmoroManagementConf { .defaultValue(10) .withDescription("The number of threads that self-optimizing uses to submit results."); + /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_KEEP_TIME} instead. */ + @Deprecated + public static final ConfigOption<Integer> OPTIMIZING_RUNTIME_DATA_KEEP_DAYS = + ConfigOptions.key("self-optimizing.runtime-data-keep-days") + .intType() + .defaultValue(30) + .withDescription( + "Deprecated: use 'self-optimizing.runtime-data-keep-time' instead. " + + "The number of days that self-optimizing runtime data keeps the runtime."); + + /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL} instead. */ + @Deprecated + public static final ConfigOption<Integer> OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS = + ConfigOptions.key("self-optimizing.runtime-data-expire-interval-hours") + .intType() + .defaultValue(1) + .withDescription( + "Deprecated: use 'self-optimizing.runtime-data-expire-interval' instead. " + + "The number of hours that self-optimizing runtime data expire interval."); + + public static final ConfigOption<Duration> OPTIMIZING_RUNTIME_DATA_KEEP_TIME = + ConfigOptions.key("self-optimizing.runtime-data-keep-time") + .durationType() + .defaultValue(Duration.ofDays(30)) + .withDescription("Duration that self-optimizing runtime data is retained."); + + public static final ConfigOption<Duration> OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL = + ConfigOptions.key("self-optimizing.runtime-data-expire-interval") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription("Interval between self-optimizing runtime data expiration runs."); + public static final ConfigOption<Boolean> OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED = ConfigOptions.key("self-optimizing.break-quota-limit-enabled") .booleanType() @@ -402,6 +434,24 @@ public class AmoroManagementConf { .withDescription( "Allow the table to break the quota limit when the resource is sufficient."); + /** @deprecated Use {@link #PROCESS_HISTORY_DATA_KEEP_TIME} instead. */ + @Deprecated + public static final ConfigOption<Integer> PROCESS_HISTORY_DATA_KEEP_DAYS = + ConfigOptions.key("process.history-data-keep-days") + .intType() + .defaultValue(7) + .withDescription( + "Deprecated: use 'process.history-data-keep-time' instead. " + + "The number of days that process history data is retained."); + + public static final ConfigOption<Duration> PROCESS_HISTORY_DATA_KEEP_TIME = + ConfigOptions.key("process.history-data-keep-time") + .durationType() + .defaultValue(Duration.ofDays(7)) + .withDescription( + "Duration that process history data is retained. " + + "Expired terminal process records will be deleted automatically."); + public static final ConfigOption<Duration> OVERVIEW_CACHE_REFRESH_INTERVAL = ConfigOptions.key("overview-cache.refresh-interval") .durationType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index c717d71b8..638354983 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -290,6 +290,7 @@ public class AmoroServiceContainer { addHandlerChain(optimizingService.getTableRuntimeHandler()); addHandlerChain(processService.getTableHandlerChain()); addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); + addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor()); tableService.initialize(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index 9f9f901de..6df589cc9 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -95,36 +95,7 @@ public class IcebergProcessFactory implements ProcessFactory { .durationType() .defaultValue(Duration.ofMinutes(10)); - public static final ConfigOption<Duration> EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL = - ConfigOptions.key("expire-process-data.runtime-data-expire-interval") - .durationType() - .defaultValue(Duration.ofHours(1)) - .withDescription( - "The interval for expiring process runtime data, " - + "including optimizing runtime data and completed process metadata."); - - public static final ConfigOption<Duration> EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME = - ConfigOptions.key("expire-process-data.runtime-data-keep-time") - .durationType() - .defaultValue(Duration.ofDays(30)) - .withDescription( - "The maximum retention time for process runtime data " - + "(e.g., process records, process states, task runtimes, optimizing quotas). " - + "Data older than this will be cleaned up during expire-process-data execution."); - - public static final ConfigOption<Duration> EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME = - ConfigOptions.key("expire-process-data.history-data-keep-time") - .durationType() - .defaultValue(Duration.ofDays(7)) - .withDescription( - "The maximum retention time for completed process history data. " - + "Only applies when it is shorter than runtime-data-keep-time," - + " and only affects terminal process records (SUCCESS/FAILED). " - + "Active processes (RUNNING/SUBMITTED/PENDING/CANCELING) are never deleted."); - private ExecuteEngine localEngine; - private long expireProcessDataRuntimeKeepTimeMs; - private long expireProcessDataHistoryKeepTimeMs; private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap(); private final List<TableFormat> formats = Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE); @@ -168,8 +139,6 @@ public class IcebergProcessFactory implements ProcessFactory { return triggerAutoCreateTag(tableRuntime); } else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) { return triggerHiveCommitSync(tableRuntime); - } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) { - return triggerProcessDataExpiring(tableRuntime); } return Optional.empty(); @@ -201,12 +170,6 @@ public class IcebergProcessFactory implements ProcessFactory { return new TagsAutoCreatingProcess(tableRuntime, localEngine); } else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) { return new HiveCommitSyncProcess(tableRuntime, localEngine); - } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) { - return new ProcessDataExpiringProcess( - tableRuntime, - localEngine, - expireProcessDataRuntimeKeepTimeMs, - expireProcessDataHistoryKeepTimeMs); } throw new RecoverProcessFailedException( @@ -254,16 +217,6 @@ public class IcebergProcessFactory implements ProcessFactory { this.actions.put( IcebergActions.SYNC_HIVE_TABLES, ProcessTriggerStrategy.triggerAtFixRate(interval)); } - - Duration expireProcessDataInterval = - configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL); - this.actions.put( - IcebergActions.EXPIRE_PROCESS_DATA, - ProcessTriggerStrategy.triggerAtFixRate(expireProcessDataInterval)); - this.expireProcessDataRuntimeKeepTimeMs = - configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME).toMillis(); - this.expireProcessDataHistoryKeepTimeMs = - configs.getDuration(EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME).toMillis(); } private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) { @@ -348,19 +301,6 @@ public class IcebergProcessFactory implements ProcessFactory { return Optional.of(new HiveCommitSyncProcess(tableRuntime, localEngine)); } - private Optional<TableProcess> triggerProcessDataExpiring(TableRuntime tableRuntime) { - if (localEngine == null) { - return Optional.empty(); - } - - return Optional.of( - new ProcessDataExpiringProcess( - tableRuntime, - localEngine, - expireProcessDataRuntimeKeepTimeMs, - expireProcessDataHistoryKeepTimeMs)); - } - @Override public void close() {} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index 0cc084802..52c96ed1d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -22,12 +22,15 @@ import org.apache.amoro.config.Configurations; import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.table.TableService; +import java.time.Duration; + public class InlineTableExecutors { private static final InlineTableExecutors instance = new InlineTableExecutors(); private TableRuntimeRefreshExecutor tableRefreshingExecutor; private BlockerExpiringExecutor blockerExpiringExecutor; private OptimizingCommitExecutor optimizingCommitExecutor; + private ProcessDataExpiringExecutor processDataExpiringExecutor; public static InlineTableExecutors getInstance() { return instance; @@ -37,6 +40,23 @@ public class InlineTableExecutors { this.optimizingCommitExecutor = new OptimizingCommitExecutor( tableService, conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT)); + Duration optimizingKeepTime = + conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME) + ? conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME) + : Duration.ofDays( + conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS)); + Duration expireInterval = + conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL) + ? conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL) + : Duration.ofHours( + conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS)); + Duration processKeepTime = + conf.contains(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME) + ? conf.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME) + : Duration.ofDays(conf.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS)); + this.processDataExpiringExecutor = + new ProcessDataExpiringExecutor( + tableService, optimizingKeepTime, expireInterval, processKeepTime); this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService); this.tableRefreshingExecutor = new TableRuntimeRefreshExecutor( @@ -57,4 +77,8 @@ public class InlineTableExecutors { public OptimizingCommitExecutor getOptimizingCommitExecutor() { return optimizingCommitExecutor; } + + public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() { + return processDataExpiringExecutor; + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java similarity index 66% rename from amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java rename to amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java index f08614121..1bc4e9ac7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java @@ -16,71 +16,62 @@ * limitations under the License. */ -package org.apache.amoro.server.process.iceberg; +package org.apache.amoro.server.scheduler.inline; -import org.apache.amoro.Action; -import org.apache.amoro.IcebergActions; import org.apache.amoro.TableRuntime; -import org.apache.amoro.process.ExecuteEngine; -import org.apache.amoro.process.LocalProcess; -import org.apache.amoro.process.TableProcess; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.utils.SnowflakeIdGenerator; -import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; +import java.time.Duration; -/** Local table process for expiring optimizing runtime data and process history records. */ -public class ProcessDataExpiringProcess extends TableProcess implements LocalProcess { - - private static final Logger LOG = LoggerFactory.getLogger(ProcessDataExpiringProcess.class); +public class ProcessDataExpiringExecutor extends PeriodicTableScheduler { + private static final Logger LOG = LoggerFactory.getLogger(ProcessDataExpiringExecutor.class); private final Persistency persistency = new Persistency(); private final long optimizingKeepTimeMs; private final long processKeepTimeMs; + private final long expireIntervalMs; - public ProcessDataExpiringProcess( - TableRuntime tableRuntime, - ExecuteEngine engine, - long optimizingKeepTimeMs, - long processKeepTimeMs) { - super(tableRuntime, engine); - this.optimizingKeepTimeMs = optimizingKeepTimeMs; - this.processKeepTimeMs = processKeepTimeMs; + public ProcessDataExpiringExecutor( + TableService tableService, + Duration optimizingKeepTime, + Duration expireInterval, + Duration processKeepTime) { + super(tableService, 1); + this.optimizingKeepTimeMs = optimizingKeepTime.toMillis(); + this.processKeepTimeMs = processKeepTime.toMillis(); + this.expireIntervalMs = expireInterval.toMillis(); } @Override - public String tag() { - return getAction().getName().toLowerCase(); - } - - @Override - public void run() { - try { - persistency.doExpiring(tableRuntime); - } catch (Throwable t) { - LOG.error("Expiring table runtimes of {} failed.", tableRuntime.getTableIdentifier(), t); - throw new RuntimeException(t); - } + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return expireIntervalMs; } @Override - public Action getAction() { - return IcebergActions.EXPIRE_PROCESS_DATA; + protected boolean enabled(TableRuntime tableRuntime) { + return true; } @Override - public Map<String, String> getProcessParameters() { - return Maps.newHashMap(); + protected long getExecutorDelay() { + return 0; } @Override - public Map<String, String> getSummary() { - return Maps.newHashMap(); + protected void execute(TableRuntime tableRuntime) { + try { + persistency.doExpiring(tableRuntime); + } catch (Throwable throwable) { + LOG.error( + "Expiring table runtimes of {} failed.", tableRuntime.getTableIdentifier(), throwable); + } } private class Persistency extends PersistentBase { @@ -109,9 +100,8 @@ public class ProcessDataExpiringProcess extends TableProcess implements LocalPro mapper -> mapper.deleteOptimizingQuotaBefore(tableId, optimizingMinId))); // 2. Expire process history terminal records (processKeepTimeMs, e.g. 7d) - // Only deletes terminal records in the window between processKeepTime and - // optimizingKeepTime, - // since records older than optimizingKeepTime are already removed by step 1. + // Only deletes terminal records in the window between processKeepTime and keepTime, + // since records older than keepTime are already removed by step 1. if (processKeepTimeMs < optimizingKeepTimeMs) { long processMinId = SnowflakeIdGenerator.getMinSnowflakeId(now - processKeepTimeMs); doAs( diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java index 1e770e359..07898363a 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java @@ -385,6 +385,8 @@ public class AmsEnvironment { + "\n" + " self-optimizing:\n" + " commit-thread-count: 10\n" + + " runtime-data-keep-days: 30\n" + + " runtime-data-expire-interval-hours: 1\n" + " break-quota-limit-enabled: true\n" + "\n" + " database:\n" diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java index 63a308166..414f760c2 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java @@ -75,6 +75,32 @@ public class TestAmoroManagementConf { assertTimeRelatedConfigs(serviceConfig, expectedConfig); } + @Test + void testNewDurationConfigDefaults() { + Configurations serviceConfig = new Configurations(); + Assertions.assertEquals( + Duration.ofDays(30), + serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME)); + Assertions.assertEquals( + Duration.ofHours(1), + serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL)); + Assertions.assertEquals( + Duration.ofDays(7), serviceConfig.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME)); + } + + @Test + void testDeprecatedIntegerConfigDefaults() { + Configurations serviceConfig = new Configurations(); + Assertions.assertEquals( + 30, serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS)); + Assertions.assertEquals( + 1, + serviceConfig.getInteger( + AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS)); + Assertions.assertEquals( + 7, serviceConfig.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS)); + } + @Test void testParsingDefaultStorageRelatedConfigs() { Configurations serviceConfig = new Configurations(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java index e3f1c73f4..45aa9e9bf 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -55,8 +55,6 @@ public class TestIcebergProcessFactory { "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, Duration.ofHours(24)); assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA, Duration.ofHours(24)); assertSupportedAction("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, Duration.ofHours(1)); - assertSupportedAction( - "expire-process-data", IcebergActions.EXPIRE_PROCESS_DATA, Duration.ofHours(1)); } @Test @@ -75,11 +73,6 @@ public class TestIcebergProcessFactory { "auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, TagsAutoCreatingProcess.class, 0); assertTriggerWhenDue( "sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, HiveCommitSyncProcess.class, 0); - assertTriggerWhenDue( - "expire-process-data", - IcebergActions.EXPIRE_PROCESS_DATA, - ProcessDataExpiringProcess.class, - 0); } @Test @@ -141,14 +134,6 @@ public class TestIcebergProcessFactory { assertRecover("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, HiveCommitSyncProcess.class); } - @Test - public void testRecoverExpireProcessDataProcess() { - assertRecover( - "expire-process-data", - IcebergActions.EXPIRE_PROCESS_DATA, - ProcessDataExpiringProcess.class); - } - @Test public void testRecoverUnsupportedActionThrows() { IcebergProcessFactory factory = openedFactory("expire-snapshots"); @@ -195,20 +180,12 @@ public class TestIcebergProcessFactory { Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME, process.getExecutionEngine()); } - private Map<String, String> buildFactoryProperties(String configKey, String interval) { - Map<String, String> properties = new HashMap<>(); - if ("expire-process-data".equals(configKey)) { - properties.put("expire-process-data.runtime-data-expire-interval", interval); - } else { - properties.put(configKey + ".enabled", "true"); - properties.put(configKey + ".interval", interval); - } - return properties; - } - private IcebergProcessFactory openedFactory(String configKey) { IcebergProcessFactory factory = new IcebergProcessFactory(); - factory.open(buildFactoryProperties(configKey, "1h")); + Map<String, String> properties = new HashMap<>(); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", "1h"); + factory.open(properties); return factory; } @@ -216,7 +193,9 @@ public class TestIcebergProcessFactory { String configKey, org.apache.amoro.Action action, Duration interval) { IcebergProcessFactory factory = new IcebergProcessFactory(); - Map<String, String> properties = buildFactoryProperties(configKey, interval.toHours() + "h"); + Map<String, String> properties = new HashMap<>(); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", interval.toHours() + "h"); factory.open(properties); @@ -233,7 +212,10 @@ public class TestIcebergProcessFactory { String configKey, org.apache.amoro.Action action, Class<?> processClass, long lastTime) { IcebergProcessFactory factory = new IcebergProcessFactory(); - factory.open(buildFactoryProperties(configKey, "1h")); + Map<String, String> properties = new HashMap<>(); + properties.put(configKey + ".enabled", "true"); + properties.put(configKey + ".interval", "1h"); + factory.open(properties); LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class); doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java new file mode 100644 index 000000000..4a7c2895d --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.scheduler.inline; + +import org.apache.amoro.TableRuntime; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.time.Duration; + +/** Tests for configurable interval in ProcessDataExpiringExecutor. */ +public class TestConfigurableIntervalExecutors { + + @Test + public void testProcessDataExpiringDefaultInterval() { + Duration optimizingKeepTime = Duration.ofDays(30); + Duration expireInterval = Duration.ofHours(1); + Duration processKeepTime = Duration.ofDays(7); + ProcessDataExpiringExecutor executor = + new ProcessDataExpiringExecutor(null, optimizingKeepTime, expireInterval, processKeepTime); + + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Assert.assertEquals( + Duration.ofHours(1).toMillis(), executor.getNextExecutingTime(tableRuntime)); + } + + @Test + public void testProcessDataExpiringCustomInterval() { + Duration optimizingKeepTime = Duration.ofDays(15); + Duration expireInterval = Duration.ofMinutes(30); + Duration processKeepTime = Duration.ofDays(3); + ProcessDataExpiringExecutor executor = + new ProcessDataExpiringExecutor(null, optimizingKeepTime, expireInterval, processKeepTime); + + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Assert.assertEquals( + Duration.ofMinutes(30).toMillis(), executor.getNextExecutingTime(tableRuntime)); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java similarity index 87% rename from amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java rename to amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java index 2f714d8e0..8e2139ba6 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java @@ -16,17 +16,17 @@ * limitations under the License. */ -package org.apache.amoro.server.process.iceberg; +package org.apache.amoro.server.scheduler.inline; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; -import org.apache.amoro.process.LocalExecutionEngine; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.server.AMSServiceTestBase; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.utils.SnowflakeIdGenerator; import org.junit.Assert; import org.junit.Before; @@ -37,7 +37,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; -public class TestProcessDataExpiringProcess extends AMSServiceTestBase { +public class TestProcessDataExpiringExecutor extends AMSServiceTestBase { private static final long TABLE_ID = 1L; private static final ServerTableIdentifier TABLE_IDENTIFIER = @@ -46,13 +46,12 @@ public class TestProcessDataExpiringProcess extends AMSServiceTestBase { private final Persistency persistency = new Persistency(); private DefaultTableRuntime tableRuntime; - private LocalExecutionEngine engine; + private TableService tableService; @Before public void mock() { tableRuntime = Mockito.mock(DefaultTableRuntime.class); - engine = Mockito.mock(LocalExecutionEngine.class); - Mockito.when(engine.name()).thenReturn(LocalExecutionEngine.ENGINE_NAME); + tableService = Mockito.mock(TableService.class); Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(TABLE_IDENTIFIER); // Clean up any leftover data persistency.cleanAll(TABLE_ID); @@ -62,6 +61,7 @@ public class TestProcessDataExpiringProcess extends AMSServiceTestBase { public void testProcessHistoryExpiringWhenShorterThanKeepTime() { // optimizingKeepTime=30d, processKeepTime=7d Duration optimizingKeepTime = Duration.ofDays(30); + Duration expireInterval = Duration.ofHours(1); Duration processKeepTime = Duration.ofDays(7); long now = System.currentTimeMillis(); @@ -78,10 +78,10 @@ public class TestProcessDataExpiringProcess extends AMSServiceTestBase { Assert.assertEquals(2, persistency.listProcesses(TABLE_ID).size()); - ProcessDataExpiringProcess process = - new ProcessDataExpiringProcess( - tableRuntime, engine, optimizingKeepTime.toMillis(), processKeepTime.toMillis()); - process.run(); + ProcessDataExpiringExecutor executor = + new ProcessDataExpiringExecutor( + tableService, optimizingKeepTime, expireInterval, processKeepTime); + executor.execute(tableRuntime); List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID); Assert.assertEquals(1, remaining.size()); @@ -92,6 +92,7 @@ public class TestProcessDataExpiringProcess extends AMSServiceTestBase { public void testProcessHistoryNotExpiringWhenEqualToKeepTime() { // When processKeepTime >= optimizingKeepTime, the extra process cleanup should not trigger Duration optimizingKeepTime = Duration.ofDays(7); + Duration expireInterval = Duration.ofHours(1); Duration processKeepTime = Duration.ofDays(7); long now = System.currentTimeMillis(); @@ -103,10 +104,10 @@ public class TestProcessDataExpiringProcess extends AMSServiceTestBase { Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size()); - ProcessDataExpiringProcess process = - new ProcessDataExpiringProcess( - tableRuntime, engine, optimizingKeepTime.toMillis(), processKeepTime.toMillis()); - process.run(); + ProcessDataExpiringExecutor executor = + new ProcessDataExpiringExecutor( + tableService, optimizingKeepTime, expireInterval, processKeepTime); + executor.execute(tableRuntime); // The process should still exist - not expired by either mechanism Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size()); @@ -116,6 +117,7 @@ public class TestProcessDataExpiringProcess extends AMSServiceTestBase { public void testDeleteExpiredProcessesSkipsActiveStatuses() { // optimizingKeepTime=30d, processKeepTime=7d Duration optimizingKeepTime = Duration.ofDays(30); + Duration expireInterval = Duration.ofHours(1); Duration processKeepTime = Duration.ofDays(7); long now = System.currentTimeMillis(); @@ -143,10 +145,10 @@ public class TestProcessDataExpiringProcess extends AMSServiceTestBase { Assert.assertEquals(6, persistency.listProcesses(TABLE_ID).size()); - ProcessDataExpiringProcess process = - new ProcessDataExpiringProcess( - tableRuntime, engine, optimizingKeepTime.toMillis(), processKeepTime.toMillis()); - process.run(); + ProcessDataExpiringExecutor executor = + new ProcessDataExpiringExecutor( + tableService, optimizingKeepTime, expireInterval, processKeepTime); + executor.execute(tableRuntime); List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID); // Active statuses (RUNNING, SUBMITTED, PENDING, CANCELING) should survive diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 89bdd055a..b454bc9c1 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -31,5 +31,4 @@ public class IcebergActions { public static final Action EXPIRE_SNAPSHOTS = Action.register("expire-snapshots"); public static final Action CLEAN_DANGLING_DELETE = Action.register("clean-dangling-delete-files"); public static final Action AUTO_CREATE_TAGS = Action.register("auto-create-tags"); - public static final Action EXPIRE_PROCESS_DATA = Action.register("expire-process-data"); } diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java index f3568cf03..fb91cb63b 100644 --- a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java @@ -51,7 +51,6 @@ public class TestLocalExecutionEngine { assertCustomPoolByTag("expire-data"); assertCustomPoolByTag("auto-create-tags"); assertCustomPoolByTag("sync-hive-tables"); - assertCustomPoolByTag("expire-process-data"); } private void assertCustomPoolByTag(String tag) throws Exception { @@ -163,7 +162,6 @@ public class TestLocalExecutionEngine { properties.put("pool.expire-data.thread-count", "1"); properties.put("pool.auto-create-tags.thread-count", "1"); properties.put("pool.sync-hive-tables.thread-count", "1"); - properties.put("pool.expire-process-data.thread-count", "1"); properties.put("process.status.ttl", ttl); localEngine.open(properties); return localEngine; diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index ae2ee8d39..f4cdaaac8 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -81,8 +81,13 @@ ams: interval: 1min # 60000 max-pending-partition-count: 100 # default 100 + process: + history-data-keep-time: 7d + self-optimizing: commit-thread-count: 10 + runtime-data-keep-time: 30d + runtime-data-expire-interval: 1h refresh-group-interval: 30s break-quota-limit-enabled: true diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml index dcb5fd934..14ebcc4e9 100755 --- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -26,5 +26,4 @@ execute-engines: pool.clean-dangling-delete-files.thread-count: 10 pool.expire-data.thread-count: 10 pool.auto-create-tags.thread-count: 3 - pool.sync-hive-tables.thread-count: 10 - pool.expire-process-data.thread-count: 1 \ No newline at end of file + pool.sync-hive-tables.thread-count: 10 \ No newline at end of file diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml index 6bb8c3830..1085d364f 100755 --- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -32,6 +32,3 @@ process-factories: auto-create-tags.interval: "1min" sync-hive-tables.enabled: "false" sync-hive-tables.interval: "10min" - expire-process-data.runtime-data-expire-interval: "1h" - expire-process-data.runtime-data-keep-time: "30d" - expire-process-data.history-data-keep-time: "7d" diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md index 1217567bb..14f32149f 100644 --- a/docs/admin-guides/deployment.md +++ b/docs/admin-guides/deployment.md @@ -282,9 +282,6 @@ process-factories: auto-create-tags.interval: "1m" # interval for auto creating tags sync-hive-tables.enabled: "false" # enable synchronizing Hive tables sync-hive-tables.interval: "10min" # interval for synchronizing Hive tables - expire-process-data.runtime-data-expire-interval: "1h" # interval for expiring process data - expire-process-data.runtime-data-keep-time: "30d" # duration to keep optimizing runtime data - expire-process-data.history-data-keep-time: "7d" # duration to keep terminal process history records ``` {{< hint info >}} @@ -319,7 +316,6 @@ execute-engines: pool.expire-data.thread-count: 10 # thread pool for data expiration pool.auto-create-tags.thread-count: 3 # thread pool for auto creating tags pool.sync-hive-tables.thread-count: 10 # thread pool for synchronizing Hive tables - pool.expire-process-data.thread-count: 1 # thread pool for expiring process data process.status.ttl: 4h # TTL for process status cache ``` diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index 96c630070..2a6c6b6e8 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -97,6 +97,8 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | optimizer.task-execute-timeout | 2147483647 s | Timeout duration for task execution, default to Integer.MAX_VALUE seconds(about 24,855 days). | | overview-cache.max-size | 3360 | Max size of overview cache. | | overview-cache.refresh-interval | 3 min | Interval for refreshing overview cache. | +| process.history-data-keep-days | 7 | Deprecated: use 'process.history-data-keep-time' instead. The number of days that process history data is retained. | +| process.history-data-keep-time | 7 d | Duration that process history data is retained. Expired terminal process records will be deleted automatically. | | refresh-external-catalogs.interval | 3 min | Interval to refresh the external catalog. | | refresh-external-catalogs.queue-size | 1000000 | The queue size of the executors of the external catalog explorer. | | refresh-external-catalogs.thread-count | 10 | The number of threads used for discovering tables in external catalogs. | @@ -108,6 +110,10 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | self-optimizing.commit-thread-count | 10 | The number of threads that self-optimizing uses to submit results. | | self-optimizing.plan-manifest-io-thread-count | 10 | Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing manifests in the base table implementation across all concurrent planning operations. | | self-optimizing.refresh-group-interval | 30 s | Optimizer group refresh interval. | +| self-optimizing.runtime-data-expire-interval | 1 h | Interval between self-optimizing runtime data expiration runs. | +| self-optimizing.runtime-data-expire-interval-hours | 1 | Deprecated: use 'self-optimizing.runtime-data-expire-interval' instead. The number of hours that self-optimizing runtime data expire interval. | +| self-optimizing.runtime-data-keep-days | 30 | Deprecated: use 'self-optimizing.runtime-data-keep-time' instead. The number of days that self-optimizing runtime data keeps the runtime. | +| self-optimizing.runtime-data-keep-time | 30 d | Duration that self-optimizing runtime data is retained. | | server-bind-host | 0.0.0.0 | The host bound to the server. | | server-expose-host | | The exposed host of the server. | | table-manifest-io.thread-count | 20 | Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing manifests in the base table implementation across all concurrent planning or commit operations. |
