This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 374ed06bd Revert "[AMORO-4246] Refactor process data expiring via
ProcessFactory plugin" (#4252)
374ed06bd is described below
commit 374ed06bdb3d2de20856f6e2f13b4a5c49cc4757
Author: ZhouJinsong <[email protected]>
AuthorDate: Mon Jun 15 10:56:17 2026 +0800
Revert "[AMORO-4246] Refactor process data expiring via ProcessFactory
plugin" (#4252)
Revert "[AMORO-4246] Refactor process data expiring via ProcessFactory
plugin…"
This reverts commit acf8ef47a3d9305ef217399a9aeb8c6d312bbe75.
---
.../apache/amoro/server/AmoroManagementConf.java | 50 +++++++++++++++
.../apache/amoro/server/AmoroServiceContainer.java | 1 +
.../process/iceberg/IcebergProcessFactory.java | 60 ------------------
.../scheduler/inline/InlineTableExecutors.java | 24 ++++++++
.../inline/ProcessDataExpiringExecutor.java} | 72 ++++++++++------------
.../org/apache/amoro/server/AmsEnvironment.java | 2 +
.../amoro/server/TestAmoroManagementConf.java | 26 ++++++++
.../process/iceberg/TestIcebergProcessFactory.java | 40 ++++--------
.../inline/TestConfigurableIntervalExecutors.java | 56 +++++++++++++++++
.../inline/TestProcessDataExpiringExecutor.java} | 38 ++++++------
.../main/java/org/apache/amoro/IcebergActions.java | 1 -
.../amoro/process/TestLocalExecutionEngine.java | 2 -
dist/src/main/amoro-bin/conf/config.yaml | 5 ++
.../amoro-bin/conf/plugins/execute-engines.yaml | 3 +-
.../amoro-bin/conf/plugins/process-factories.yaml | 3 -
docs/admin-guides/deployment.md | 4 --
docs/configuration/ams-config.md | 6 ++
17 files changed, 233 insertions(+), 160 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 2ac6f5cfb..f38f6f5ac 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -395,6 +395,38 @@ public class AmoroManagementConf {
.defaultValue(10)
.withDescription("The number of threads that self-optimizing uses to
submit results.");
+ /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_KEEP_TIME} instead. */
+ @Deprecated
+ public static final ConfigOption<Integer> OPTIMIZING_RUNTIME_DATA_KEEP_DAYS =
+ ConfigOptions.key("self-optimizing.runtime-data-keep-days")
+ .intType()
+ .defaultValue(30)
+ .withDescription(
+ "Deprecated: use 'self-optimizing.runtime-data-keep-time'
instead. "
+ + "The number of days that self-optimizing runtime data
keeps the runtime.");
+
+ /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL}
instead. */
+ @Deprecated
+ public static final ConfigOption<Integer>
OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS =
+ ConfigOptions.key("self-optimizing.runtime-data-expire-interval-hours")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "Deprecated: use 'self-optimizing.runtime-data-expire-interval'
instead. "
+ + "The number of hours that self-optimizing runtime data
expire interval.");
+
+ public static final ConfigOption<Duration> OPTIMIZING_RUNTIME_DATA_KEEP_TIME
=
+ ConfigOptions.key("self-optimizing.runtime-data-keep-time")
+ .durationType()
+ .defaultValue(Duration.ofDays(30))
+ .withDescription("Duration that self-optimizing runtime data is
retained.");
+
+ public static final ConfigOption<Duration>
OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL =
+ ConfigOptions.key("self-optimizing.runtime-data-expire-interval")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription("Interval between self-optimizing runtime data
expiration runs.");
+
public static final ConfigOption<Boolean>
OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED =
ConfigOptions.key("self-optimizing.break-quota-limit-enabled")
.booleanType()
@@ -402,6 +434,24 @@ public class AmoroManagementConf {
.withDescription(
"Allow the table to break the quota limit when the resource is
sufficient.");
+ /** @deprecated Use {@link #PROCESS_HISTORY_DATA_KEEP_TIME} instead. */
+ @Deprecated
+ public static final ConfigOption<Integer> PROCESS_HISTORY_DATA_KEEP_DAYS =
+ ConfigOptions.key("process.history-data-keep-days")
+ .intType()
+ .defaultValue(7)
+ .withDescription(
+ "Deprecated: use 'process.history-data-keep-time' instead. "
+ + "The number of days that process history data is
retained.");
+
+ public static final ConfigOption<Duration> PROCESS_HISTORY_DATA_KEEP_TIME =
+ ConfigOptions.key("process.history-data-keep-time")
+ .durationType()
+ .defaultValue(Duration.ofDays(7))
+ .withDescription(
+ "Duration that process history data is retained. "
+ + "Expired terminal process records will be deleted
automatically.");
+
public static final ConfigOption<Duration> OVERVIEW_CACHE_REFRESH_INTERVAL =
ConfigOptions.key("overview-cache.refresh-interval")
.durationType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index c717d71b8..638354983 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -290,6 +290,7 @@ public class AmoroServiceContainer {
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(processService.getTableHandlerChain());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
+
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
tableService.initialize();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 9f9f901de..6df589cc9 100755
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -95,36 +95,7 @@ public class IcebergProcessFactory implements ProcessFactory
{
.durationType()
.defaultValue(Duration.ofMinutes(10));
- public static final ConfigOption<Duration>
EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL =
- ConfigOptions.key("expire-process-data.runtime-data-expire-interval")
- .durationType()
- .defaultValue(Duration.ofHours(1))
- .withDescription(
- "The interval for expiring process runtime data, "
- + "including optimizing runtime data and completed process
metadata.");
-
- public static final ConfigOption<Duration>
EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME =
- ConfigOptions.key("expire-process-data.runtime-data-keep-time")
- .durationType()
- .defaultValue(Duration.ofDays(30))
- .withDescription(
- "The maximum retention time for process runtime data "
- + "(e.g., process records, process states, task runtimes,
optimizing quotas). "
- + "Data older than this will be cleaned up during
expire-process-data execution.");
-
- public static final ConfigOption<Duration>
EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME =
- ConfigOptions.key("expire-process-data.history-data-keep-time")
- .durationType()
- .defaultValue(Duration.ofDays(7))
- .withDescription(
- "The maximum retention time for completed process history data. "
- + "Only applies when it is shorter than
runtime-data-keep-time,"
- + " and only affects terminal process records
(SUCCESS/FAILED). "
- + "Active processes (RUNNING/SUBMITTED/PENDING/CANCELING)
are never deleted.");
-
private ExecuteEngine localEngine;
- private long expireProcessDataRuntimeKeepTimeMs;
- private long expireProcessDataHistoryKeepTimeMs;
private final Map<Action, ProcessTriggerStrategy> actions =
Maps.newHashMap();
private final List<TableFormat> formats =
Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE);
@@ -168,8 +139,6 @@ public class IcebergProcessFactory implements
ProcessFactory {
return triggerAutoCreateTag(tableRuntime);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return triggerHiveCommitSync(tableRuntime);
- } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) {
- return triggerProcessDataExpiring(tableRuntime);
}
return Optional.empty();
@@ -201,12 +170,6 @@ public class IcebergProcessFactory implements
ProcessFactory {
return new TagsAutoCreatingProcess(tableRuntime, localEngine);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return new HiveCommitSyncProcess(tableRuntime, localEngine);
- } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) {
- return new ProcessDataExpiringProcess(
- tableRuntime,
- localEngine,
- expireProcessDataRuntimeKeepTimeMs,
- expireProcessDataHistoryKeepTimeMs);
}
throw new RecoverProcessFailedException(
@@ -254,16 +217,6 @@ public class IcebergProcessFactory implements
ProcessFactory {
this.actions.put(
IcebergActions.SYNC_HIVE_TABLES,
ProcessTriggerStrategy.triggerAtFixRate(interval));
}
-
- Duration expireProcessDataInterval =
- configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL);
- this.actions.put(
- IcebergActions.EXPIRE_PROCESS_DATA,
- ProcessTriggerStrategy.triggerAtFixRate(expireProcessDataInterval));
- this.expireProcessDataRuntimeKeepTimeMs =
-
configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME).toMillis();
- this.expireProcessDataHistoryKeepTimeMs =
-
configs.getDuration(EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME).toMillis();
}
private Optional<TableProcess> triggerExpireSnapshot(TableRuntime
tableRuntime) {
@@ -348,19 +301,6 @@ public class IcebergProcessFactory implements
ProcessFactory {
return Optional.of(new HiveCommitSyncProcess(tableRuntime, localEngine));
}
- private Optional<TableProcess> triggerProcessDataExpiring(TableRuntime
tableRuntime) {
- if (localEngine == null) {
- return Optional.empty();
- }
-
- return Optional.of(
- new ProcessDataExpiringProcess(
- tableRuntime,
- localEngine,
- expireProcessDataRuntimeKeepTimeMs,
- expireProcessDataHistoryKeepTimeMs));
- }
-
@Override
public void close() {}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 0cc084802..52c96ed1d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -22,12 +22,15 @@ import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.table.TableService;
+import java.time.Duration;
+
public class InlineTableExecutors {
private static final InlineTableExecutors instance = new
InlineTableExecutors();
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
+ private ProcessDataExpiringExecutor processDataExpiringExecutor;
public static InlineTableExecutors getInstance() {
return instance;
@@ -37,6 +40,23 @@ public class InlineTableExecutors {
this.optimizingCommitExecutor =
new OptimizingCommitExecutor(
tableService,
conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT));
+ Duration optimizingKeepTime =
+ conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME)
+ ? conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME)
+ : Duration.ofDays(
+
conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS));
+ Duration expireInterval =
+
conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL)
+ ?
conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL)
+ : Duration.ofHours(
+
conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS));
+ Duration processKeepTime =
+ conf.contains(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME)
+ ? conf.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME)
+ :
Duration.ofDays(conf.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS));
+ this.processDataExpiringExecutor =
+ new ProcessDataExpiringExecutor(
+ tableService, optimizingKeepTime, expireInterval, processKeepTime);
this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService);
this.tableRefreshingExecutor =
new TableRuntimeRefreshExecutor(
@@ -57,4 +77,8 @@ public class InlineTableExecutors {
public OptimizingCommitExecutor getOptimizingCommitExecutor() {
return optimizingCommitExecutor;
}
+
+ public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() {
+ return processDataExpiringExecutor;
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
similarity index 66%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
rename to
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
index f08614121..1bc4e9ac7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
@@ -16,71 +16,62 @@
* limitations under the License.
*/
-package org.apache.amoro.server.process.iceberg;
+package org.apache.amoro.server.scheduler.inline;
-import org.apache.amoro.Action;
-import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableRuntime;
-import org.apache.amoro.process.ExecuteEngine;
-import org.apache.amoro.process.LocalProcess;
-import org.apache.amoro.process.TableProcess;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
+import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
+import java.time.Duration;
-/** Local table process for expiring optimizing runtime data and process
history records. */
-public class ProcessDataExpiringProcess extends TableProcess implements
LocalProcess {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ProcessDataExpiringProcess.class);
+public class ProcessDataExpiringExecutor extends PeriodicTableScheduler {
+ private static final Logger LOG =
LoggerFactory.getLogger(ProcessDataExpiringExecutor.class);
private final Persistency persistency = new Persistency();
private final long optimizingKeepTimeMs;
private final long processKeepTimeMs;
+ private final long expireIntervalMs;
- public ProcessDataExpiringProcess(
- TableRuntime tableRuntime,
- ExecuteEngine engine,
- long optimizingKeepTimeMs,
- long processKeepTimeMs) {
- super(tableRuntime, engine);
- this.optimizingKeepTimeMs = optimizingKeepTimeMs;
- this.processKeepTimeMs = processKeepTimeMs;
+ public ProcessDataExpiringExecutor(
+ TableService tableService,
+ Duration optimizingKeepTime,
+ Duration expireInterval,
+ Duration processKeepTime) {
+ super(tableService, 1);
+ this.optimizingKeepTimeMs = optimizingKeepTime.toMillis();
+ this.processKeepTimeMs = processKeepTime.toMillis();
+ this.expireIntervalMs = expireInterval.toMillis();
}
@Override
- public String tag() {
- return getAction().getName().toLowerCase();
- }
-
- @Override
- public void run() {
- try {
- persistency.doExpiring(tableRuntime);
- } catch (Throwable t) {
- LOG.error("Expiring table runtimes of {} failed.",
tableRuntime.getTableIdentifier(), t);
- throw new RuntimeException(t);
- }
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
+ return expireIntervalMs;
}
@Override
- public Action getAction() {
- return IcebergActions.EXPIRE_PROCESS_DATA;
+ protected boolean enabled(TableRuntime tableRuntime) {
+ return true;
}
@Override
- public Map<String, String> getProcessParameters() {
- return Maps.newHashMap();
+ protected long getExecutorDelay() {
+ return 0;
}
@Override
- public Map<String, String> getSummary() {
- return Maps.newHashMap();
+ protected void execute(TableRuntime tableRuntime) {
+ try {
+ persistency.doExpiring(tableRuntime);
+ } catch (Throwable throwable) {
+ LOG.error(
+ "Expiring table runtimes of {} failed.",
tableRuntime.getTableIdentifier(), throwable);
+ }
}
private class Persistency extends PersistentBase {
@@ -109,9 +100,8 @@ public class ProcessDataExpiringProcess extends
TableProcess implements LocalPro
mapper -> mapper.deleteOptimizingQuotaBefore(tableId,
optimizingMinId)));
// 2. Expire process history terminal records (processKeepTimeMs, e.g.
7d)
- // Only deletes terminal records in the window between
processKeepTime and
- // optimizingKeepTime,
- // since records older than optimizingKeepTime are already removed by
step 1.
+ // Only deletes terminal records in the window between
processKeepTime and keepTime,
+ // since records older than keepTime are already removed by step 1.
if (processKeepTimeMs < optimizingKeepTimeMs) {
long processMinId = SnowflakeIdGenerator.getMinSnowflakeId(now -
processKeepTimeMs);
doAs(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index 1e770e359..07898363a 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -385,6 +385,8 @@ public class AmsEnvironment {
+ "\n"
+ " self-optimizing:\n"
+ " commit-thread-count: 10\n"
+ + " runtime-data-keep-days: 30\n"
+ + " runtime-data-expire-interval-hours: 1\n"
+ " break-quota-limit-enabled: true\n"
+ "\n"
+ " database:\n"
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
index 63a308166..414f760c2 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
@@ -75,6 +75,32 @@ public class TestAmoroManagementConf {
assertTimeRelatedConfigs(serviceConfig, expectedConfig);
}
+ @Test
+ void testNewDurationConfigDefaults() {
+ Configurations serviceConfig = new Configurations();
+ Assertions.assertEquals(
+ Duration.ofDays(30),
+
serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME));
+ Assertions.assertEquals(
+ Duration.ofHours(1),
+
serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL));
+ Assertions.assertEquals(
+ Duration.ofDays(7),
serviceConfig.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME));
+ }
+
+ @Test
+ void testDeprecatedIntegerConfigDefaults() {
+ Configurations serviceConfig = new Configurations();
+ Assertions.assertEquals(
+ 30,
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS));
+ Assertions.assertEquals(
+ 1,
+ serviceConfig.getInteger(
+
AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS));
+ Assertions.assertEquals(
+ 7,
serviceConfig.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS));
+ }
+
@Test
void testParsingDefaultStorageRelatedConfigs() {
Configurations serviceConfig = new Configurations();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index e3f1c73f4..45aa9e9bf 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -55,8 +55,6 @@ public class TestIcebergProcessFactory {
"clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE,
Duration.ofHours(24));
assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA,
Duration.ofHours(24));
assertSupportedAction("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES,
Duration.ofHours(1));
- assertSupportedAction(
- "expire-process-data", IcebergActions.EXPIRE_PROCESS_DATA,
Duration.ofHours(1));
}
@Test
@@ -75,11 +73,6 @@ public class TestIcebergProcessFactory {
"auto-create-tags", IcebergActions.AUTO_CREATE_TAGS,
TagsAutoCreatingProcess.class, 0);
assertTriggerWhenDue(
"sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES,
HiveCommitSyncProcess.class, 0);
- assertTriggerWhenDue(
- "expire-process-data",
- IcebergActions.EXPIRE_PROCESS_DATA,
- ProcessDataExpiringProcess.class,
- 0);
}
@Test
@@ -141,14 +134,6 @@ public class TestIcebergProcessFactory {
assertRecover("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES,
HiveCommitSyncProcess.class);
}
- @Test
- public void testRecoverExpireProcessDataProcess() {
- assertRecover(
- "expire-process-data",
- IcebergActions.EXPIRE_PROCESS_DATA,
- ProcessDataExpiringProcess.class);
- }
-
@Test
public void testRecoverUnsupportedActionThrows() {
IcebergProcessFactory factory = openedFactory("expire-snapshots");
@@ -195,20 +180,12 @@ public class TestIcebergProcessFactory {
Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME,
process.getExecutionEngine());
}
- private Map<String, String> buildFactoryProperties(String configKey, String
interval) {
- Map<String, String> properties = new HashMap<>();
- if ("expire-process-data".equals(configKey)) {
- properties.put("expire-process-data.runtime-data-expire-interval",
interval);
- } else {
- properties.put(configKey + ".enabled", "true");
- properties.put(configKey + ".interval", interval);
- }
- return properties;
- }
-
private IcebergProcessFactory openedFactory(String configKey) {
IcebergProcessFactory factory = new IcebergProcessFactory();
- factory.open(buildFactoryProperties(configKey, "1h"));
+ Map<String, String> properties = new HashMap<>();
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", "1h");
+ factory.open(properties);
return factory;
}
@@ -216,7 +193,9 @@ public class TestIcebergProcessFactory {
String configKey, org.apache.amoro.Action action, Duration interval) {
IcebergProcessFactory factory = new IcebergProcessFactory();
- Map<String, String> properties = buildFactoryProperties(configKey,
interval.toHours() + "h");
+ Map<String, String> properties = new HashMap<>();
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", interval.toHours() + "h");
factory.open(properties);
@@ -233,7 +212,10 @@ public class TestIcebergProcessFactory {
String configKey, org.apache.amoro.Action action, Class<?> processClass,
long lastTime) {
IcebergProcessFactory factory = new IcebergProcessFactory();
- factory.open(buildFactoryProperties(configKey, "1h"));
+ Map<String, String> properties = new HashMap<>();
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", "1h");
+ factory.open(properties);
LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
new file mode 100644
index 000000000..4a7c2895d
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.TableRuntime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.time.Duration;
+
+/** Tests for configurable interval in ProcessDataExpiringExecutor. */
+public class TestConfigurableIntervalExecutors {
+
+ @Test
+ public void testProcessDataExpiringDefaultInterval() {
+ Duration optimizingKeepTime = Duration.ofDays(30);
+ Duration expireInterval = Duration.ofHours(1);
+ Duration processKeepTime = Duration.ofDays(7);
+ ProcessDataExpiringExecutor executor =
+ new ProcessDataExpiringExecutor(null, optimizingKeepTime,
expireInterval, processKeepTime);
+
+ TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
+ Assert.assertEquals(
+ Duration.ofHours(1).toMillis(),
executor.getNextExecutingTime(tableRuntime));
+ }
+
+ @Test
+ public void testProcessDataExpiringCustomInterval() {
+ Duration optimizingKeepTime = Duration.ofDays(15);
+ Duration expireInterval = Duration.ofMinutes(30);
+ Duration processKeepTime = Duration.ofDays(3);
+ ProcessDataExpiringExecutor executor =
+ new ProcessDataExpiringExecutor(null, optimizingKeepTime,
expireInterval, processKeepTime);
+
+ TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
+ Assert.assertEquals(
+ Duration.ofMinutes(30).toMillis(),
executor.getNextExecutingTime(tableRuntime));
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
similarity index 87%
rename from
amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
rename to
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
index 2f714d8e0..8e2139ba6 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package org.apache.amoro.server.process.iceberg;
+package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
-import org.apache.amoro.process.LocalExecutionEngine;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.AMSServiceTestBase;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.junit.Assert;
import org.junit.Before;
@@ -37,7 +37,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.List;
-public class TestProcessDataExpiringProcess extends AMSServiceTestBase {
+public class TestProcessDataExpiringExecutor extends AMSServiceTestBase {
private static final long TABLE_ID = 1L;
private static final ServerTableIdentifier TABLE_IDENTIFIER =
@@ -46,13 +46,12 @@ public class TestProcessDataExpiringProcess extends
AMSServiceTestBase {
private final Persistency persistency = new Persistency();
private DefaultTableRuntime tableRuntime;
- private LocalExecutionEngine engine;
+ private TableService tableService;
@Before
public void mock() {
tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- engine = Mockito.mock(LocalExecutionEngine.class);
- Mockito.when(engine.name()).thenReturn(LocalExecutionEngine.ENGINE_NAME);
+ tableService = Mockito.mock(TableService.class);
Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(TABLE_IDENTIFIER);
// Clean up any leftover data
persistency.cleanAll(TABLE_ID);
@@ -62,6 +61,7 @@ public class TestProcessDataExpiringProcess extends
AMSServiceTestBase {
public void testProcessHistoryExpiringWhenShorterThanKeepTime() {
// optimizingKeepTime=30d, processKeepTime=7d
Duration optimizingKeepTime = Duration.ofDays(30);
+ Duration expireInterval = Duration.ofHours(1);
Duration processKeepTime = Duration.ofDays(7);
long now = System.currentTimeMillis();
@@ -78,10 +78,10 @@ public class TestProcessDataExpiringProcess extends
AMSServiceTestBase {
Assert.assertEquals(2, persistency.listProcesses(TABLE_ID).size());
- ProcessDataExpiringProcess process =
- new ProcessDataExpiringProcess(
- tableRuntime, engine, optimizingKeepTime.toMillis(),
processKeepTime.toMillis());
- process.run();
+ ProcessDataExpiringExecutor executor =
+ new ProcessDataExpiringExecutor(
+ tableService, optimizingKeepTime, expireInterval, processKeepTime);
+ executor.execute(tableRuntime);
List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID);
Assert.assertEquals(1, remaining.size());
@@ -92,6 +92,7 @@ public class TestProcessDataExpiringProcess extends
AMSServiceTestBase {
public void testProcessHistoryNotExpiringWhenEqualToKeepTime() {
// When processKeepTime >= optimizingKeepTime, the extra process cleanup
should not trigger
Duration optimizingKeepTime = Duration.ofDays(7);
+ Duration expireInterval = Duration.ofHours(1);
Duration processKeepTime = Duration.ofDays(7);
long now = System.currentTimeMillis();
@@ -103,10 +104,10 @@ public class TestProcessDataExpiringProcess extends
AMSServiceTestBase {
Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size());
- ProcessDataExpiringProcess process =
- new ProcessDataExpiringProcess(
- tableRuntime, engine, optimizingKeepTime.toMillis(),
processKeepTime.toMillis());
- process.run();
+ ProcessDataExpiringExecutor executor =
+ new ProcessDataExpiringExecutor(
+ tableService, optimizingKeepTime, expireInterval, processKeepTime);
+ executor.execute(tableRuntime);
// The process should still exist - not expired by either mechanism
Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size());
@@ -116,6 +117,7 @@ public class TestProcessDataExpiringProcess extends
AMSServiceTestBase {
public void testDeleteExpiredProcessesSkipsActiveStatuses() {
// optimizingKeepTime=30d, processKeepTime=7d
Duration optimizingKeepTime = Duration.ofDays(30);
+ Duration expireInterval = Duration.ofHours(1);
Duration processKeepTime = Duration.ofDays(7);
long now = System.currentTimeMillis();
@@ -143,10 +145,10 @@ public class TestProcessDataExpiringProcess extends
AMSServiceTestBase {
Assert.assertEquals(6, persistency.listProcesses(TABLE_ID).size());
- ProcessDataExpiringProcess process =
- new ProcessDataExpiringProcess(
- tableRuntime, engine, optimizingKeepTime.toMillis(),
processKeepTime.toMillis());
- process.run();
+ ProcessDataExpiringExecutor executor =
+ new ProcessDataExpiringExecutor(
+ tableService, optimizingKeepTime, expireInterval, processKeepTime);
+ executor.execute(tableRuntime);
List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID);
// Active statuses (RUNNING, SUBMITTED, PENDING, CANCELING) should survive
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index 89bdd055a..b454bc9c1 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -31,5 +31,4 @@ public class IcebergActions {
public static final Action EXPIRE_SNAPSHOTS =
Action.register("expire-snapshots");
public static final Action CLEAN_DANGLING_DELETE =
Action.register("clean-dangling-delete-files");
public static final Action AUTO_CREATE_TAGS =
Action.register("auto-create-tags");
- public static final Action EXPIRE_PROCESS_DATA =
Action.register("expire-process-data");
}
diff --git
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
index f3568cf03..fb91cb63b 100644
---
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
+++
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -51,7 +51,6 @@ public class TestLocalExecutionEngine {
assertCustomPoolByTag("expire-data");
assertCustomPoolByTag("auto-create-tags");
assertCustomPoolByTag("sync-hive-tables");
- assertCustomPoolByTag("expire-process-data");
}
private void assertCustomPoolByTag(String tag) throws Exception {
@@ -163,7 +162,6 @@ public class TestLocalExecutionEngine {
properties.put("pool.expire-data.thread-count", "1");
properties.put("pool.auto-create-tags.thread-count", "1");
properties.put("pool.sync-hive-tables.thread-count", "1");
- properties.put("pool.expire-process-data.thread-count", "1");
properties.put("process.status.ttl", ttl);
localEngine.open(properties);
return localEngine;
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index ae2ee8d39..f4cdaaac8 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -81,8 +81,13 @@ ams:
interval: 1min # 60000
max-pending-partition-count: 100 # default 100
+ process:
+ history-data-keep-time: 7d
+
self-optimizing:
commit-thread-count: 10
+ runtime-data-keep-time: 30d
+ runtime-data-expire-interval: 1h
refresh-group-interval: 30s
break-quota-limit-enabled: true
diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index dcb5fd934..14ebcc4e9 100755
--- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -26,5 +26,4 @@ execute-engines:
pool.clean-dangling-delete-files.thread-count: 10
pool.expire-data.thread-count: 10
pool.auto-create-tags.thread-count: 3
- pool.sync-hive-tables.thread-count: 10
- pool.expire-process-data.thread-count: 1
\ No newline at end of file
+ pool.sync-hive-tables.thread-count: 10
\ No newline at end of file
diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index 6bb8c3830..1085d364f 100755
--- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -32,6 +32,3 @@ process-factories:
auto-create-tags.interval: "1min"
sync-hive-tables.enabled: "false"
sync-hive-tables.interval: "10min"
- expire-process-data.runtime-data-expire-interval: "1h"
- expire-process-data.runtime-data-keep-time: "30d"
- expire-process-data.history-data-keep-time: "7d"
diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md
index 1217567bb..14f32149f 100644
--- a/docs/admin-guides/deployment.md
+++ b/docs/admin-guides/deployment.md
@@ -282,9 +282,6 @@ process-factories:
auto-create-tags.interval: "1m" # interval for auto creating tags
sync-hive-tables.enabled: "false" # enable synchronizing
Hive tables
sync-hive-tables.interval: "10min" # interval for
synchronizing Hive tables
- expire-process-data.runtime-data-expire-interval: "1h" #
interval for expiring process data
- expire-process-data.runtime-data-keep-time: "30d" # duration to keep
optimizing runtime data
- expire-process-data.history-data-keep-time: "7d" # duration to
keep terminal process history records
```
{{< hint info >}}
@@ -319,7 +316,6 @@ execute-engines:
pool.expire-data.thread-count: 10 # thread pool for data
expiration
pool.auto-create-tags.thread-count: 3 # thread pool for auto
creating tags
pool.sync-hive-tables.thread-count: 10 # thread pool for
synchronizing Hive tables
- pool.expire-process-data.thread-count: 1 # thread pool for expiring
process data
process.status.ttl: 4h # TTL for process status
cache
```
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 96c630070..2a6c6b6e8 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -97,6 +97,8 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| optimizer.task-execute-timeout | 2147483647 s | Timeout duration for task
execution, default to Integer.MAX_VALUE seconds(about 24,855 days). |
| overview-cache.max-size | 3360 | Max size of overview cache. |
| overview-cache.refresh-interval | 3 min | Interval for refreshing overview
cache. |
+| process.history-data-keep-days | 7 | Deprecated: use
'process.history-data-keep-time' instead. The number of days that process
history data is retained. |
+| process.history-data-keep-time | 7 d | Duration that process history data is
retained. Expired terminal process records will be deleted automatically. |
| refresh-external-catalogs.interval | 3 min | Interval to refresh the
external catalog. |
| refresh-external-catalogs.queue-size | 1000000 | The queue size of the
executors of the external catalog explorer. |
| refresh-external-catalogs.thread-count | 10 | The number of threads used for
discovering tables in external catalogs. |
@@ -108,6 +110,10 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| self-optimizing.commit-thread-count | 10 | The number of threads that
self-optimizing uses to submit results. |
| self-optimizing.plan-manifest-io-thread-count | 10 | Sets the size of the
worker pool. The worker pool limits the number of tasks concurrently processing
manifests in the base table implementation across all concurrent planning
operations. |
| self-optimizing.refresh-group-interval | 30 s | Optimizer group refresh
interval. |
+| self-optimizing.runtime-data-expire-interval | 1 h | Interval between
self-optimizing runtime data expiration runs. |
+| self-optimizing.runtime-data-expire-interval-hours | 1 | Deprecated: use
'self-optimizing.runtime-data-expire-interval' instead. The number of hours
that self-optimizing runtime data expire interval. |
+| self-optimizing.runtime-data-keep-days | 30 | Deprecated: use
'self-optimizing.runtime-data-keep-time' instead. The number of days that
self-optimizing runtime data keeps the runtime. |
+| self-optimizing.runtime-data-keep-time | 30 d | Duration that
self-optimizing runtime data is retained. |
| server-bind-host | 0.0.0.0 | The host bound to the server. |
| server-expose-host | | The exposed host of the server. |
| table-manifest-io.thread-count | 20 | Sets the size of the worker pool. The
worker pool limits the number of tasks concurrently processing manifests in the
base table implementation across all concurrent planning or commit operations. |