This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new acf8ef47a [AMORO-4246] Refactor process data expiring via
ProcessFactory plugin (#4247)
acf8ef47a is described below
commit acf8ef47a3d9305ef217399a9aeb8c6d312bbe75
Author: WenLingzhang <[email protected]>
AuthorDate: Thu Jun 11 17:16:56 2026 +0800
[AMORO-4246] Refactor process data expiring via ProcessFactory plugin
(#4247)
* Refactor process data expiring via ProcessFactory plugin
* fixup
* fixup
* fixup
---------
Co-authored-by: 张文领 <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 50 ---------------
.../apache/amoro/server/AmoroServiceContainer.java | 1 -
.../process/iceberg/IcebergProcessFactory.java | 60 ++++++++++++++++++
.../iceberg/ProcessDataExpiringProcess.java} | 72 ++++++++++++----------
.../scheduler/inline/InlineTableExecutors.java | 24 --------
.../org/apache/amoro/server/AmsEnvironment.java | 2 -
.../amoro/server/TestAmoroManagementConf.java | 26 --------
.../process/iceberg/TestIcebergProcessFactory.java | 40 ++++++++----
.../iceberg/TestProcessDataExpiringProcess.java} | 38 ++++++------
.../inline/TestConfigurableIntervalExecutors.java | 56 -----------------
.../main/java/org/apache/amoro/IcebergActions.java | 1 +
.../amoro/process/TestLocalExecutionEngine.java | 2 +
dist/src/main/amoro-bin/conf/config.yaml | 5 --
.../amoro-bin/conf/plugins/execute-engines.yaml | 3 +-
.../amoro-bin/conf/plugins/process-factories.yaml | 3 +
docs/admin-guides/deployment.md | 4 ++
docs/configuration/ams-config.md | 6 --
17 files changed, 160 insertions(+), 233 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index f38f6f5ac..2ac6f5cfb 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -395,38 +395,6 @@ public class AmoroManagementConf {
.defaultValue(10)
.withDescription("The number of threads that self-optimizing uses to
submit results.");
- /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_KEEP_TIME} instead. */
- @Deprecated
- public static final ConfigOption<Integer> OPTIMIZING_RUNTIME_DATA_KEEP_DAYS =
- ConfigOptions.key("self-optimizing.runtime-data-keep-days")
- .intType()
- .defaultValue(30)
- .withDescription(
- "Deprecated: use 'self-optimizing.runtime-data-keep-time'
instead. "
- + "The number of days that self-optimizing runtime data
keeps the runtime.");
-
- /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL}
instead. */
- @Deprecated
- public static final ConfigOption<Integer>
OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS =
- ConfigOptions.key("self-optimizing.runtime-data-expire-interval-hours")
- .intType()
- .defaultValue(1)
- .withDescription(
- "Deprecated: use 'self-optimizing.runtime-data-expire-interval'
instead. "
- + "The number of hours that self-optimizing runtime data
expire interval.");
-
- public static final ConfigOption<Duration> OPTIMIZING_RUNTIME_DATA_KEEP_TIME
=
- ConfigOptions.key("self-optimizing.runtime-data-keep-time")
- .durationType()
- .defaultValue(Duration.ofDays(30))
- .withDescription("Duration that self-optimizing runtime data is
retained.");
-
- public static final ConfigOption<Duration>
OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL =
- ConfigOptions.key("self-optimizing.runtime-data-expire-interval")
- .durationType()
- .defaultValue(Duration.ofHours(1))
- .withDescription("Interval between self-optimizing runtime data
expiration runs.");
-
public static final ConfigOption<Boolean>
OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED =
ConfigOptions.key("self-optimizing.break-quota-limit-enabled")
.booleanType()
@@ -434,24 +402,6 @@ public class AmoroManagementConf {
.withDescription(
"Allow the table to break the quota limit when the resource is
sufficient.");
- /** @deprecated Use {@link #PROCESS_HISTORY_DATA_KEEP_TIME} instead. */
- @Deprecated
- public static final ConfigOption<Integer> PROCESS_HISTORY_DATA_KEEP_DAYS =
- ConfigOptions.key("process.history-data-keep-days")
- .intType()
- .defaultValue(7)
- .withDescription(
- "Deprecated: use 'process.history-data-keep-time' instead. "
- + "The number of days that process history data is
retained.");
-
- public static final ConfigOption<Duration> PROCESS_HISTORY_DATA_KEEP_TIME =
- ConfigOptions.key("process.history-data-keep-time")
- .durationType()
- .defaultValue(Duration.ofDays(7))
- .withDescription(
- "Duration that process history data is retained. "
- + "Expired terminal process records will be deleted
automatically.");
-
public static final ConfigOption<Duration> OVERVIEW_CACHE_REFRESH_INTERVAL =
ConfigOptions.key("overview-cache.refresh-interval")
.durationType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 638354983..c717d71b8 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -290,7 +290,6 @@ public class AmoroServiceContainer {
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(processService.getTableHandlerChain());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
-
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
tableService.initialize();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 6df589cc9..9f9f901de 100755
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -95,7 +95,36 @@ public class IcebergProcessFactory implements ProcessFactory
{
.durationType()
.defaultValue(Duration.ofMinutes(10));
+ public static final ConfigOption<Duration>
EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL =
+ ConfigOptions.key("expire-process-data.runtime-data-expire-interval")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ "The interval for expiring process runtime data, "
+ + "including optimizing runtime data and completed process
metadata.");
+
+ public static final ConfigOption<Duration>
EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME =
+ ConfigOptions.key("expire-process-data.runtime-data-keep-time")
+ .durationType()
+ .defaultValue(Duration.ofDays(30))
+ .withDescription(
+ "The maximum retention time for process runtime data "
+ + "(e.g., process records, process states, task runtimes,
optimizing quotas). "
+ + "Data older than this will be cleaned up during
expire-process-data execution.");
+
+ public static final ConfigOption<Duration>
EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME =
+ ConfigOptions.key("expire-process-data.history-data-keep-time")
+ .durationType()
+ .defaultValue(Duration.ofDays(7))
+ .withDescription(
+ "The maximum retention time for completed process history data. "
+ + "Only applies when it is shorter than
runtime-data-keep-time,"
+ + " and only affects terminal process records
(SUCCESS/FAILED). "
+ + "Active processes (RUNNING/SUBMITTED/PENDING/CANCELING)
are never deleted.");
+
private ExecuteEngine localEngine;
+ private long expireProcessDataRuntimeKeepTimeMs;
+ private long expireProcessDataHistoryKeepTimeMs;
private final Map<Action, ProcessTriggerStrategy> actions =
Maps.newHashMap();
private final List<TableFormat> formats =
Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE);
@@ -139,6 +168,8 @@ public class IcebergProcessFactory implements
ProcessFactory {
return triggerAutoCreateTag(tableRuntime);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return triggerHiveCommitSync(tableRuntime);
+ } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) {
+ return triggerProcessDataExpiring(tableRuntime);
}
return Optional.empty();
@@ -170,6 +201,12 @@ public class IcebergProcessFactory implements
ProcessFactory {
return new TagsAutoCreatingProcess(tableRuntime, localEngine);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return new HiveCommitSyncProcess(tableRuntime, localEngine);
+ } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) {
+ return new ProcessDataExpiringProcess(
+ tableRuntime,
+ localEngine,
+ expireProcessDataRuntimeKeepTimeMs,
+ expireProcessDataHistoryKeepTimeMs);
}
throw new RecoverProcessFailedException(
@@ -217,6 +254,16 @@ public class IcebergProcessFactory implements
ProcessFactory {
this.actions.put(
IcebergActions.SYNC_HIVE_TABLES,
ProcessTriggerStrategy.triggerAtFixRate(interval));
}
+
+ Duration expireProcessDataInterval =
+ configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL);
+ this.actions.put(
+ IcebergActions.EXPIRE_PROCESS_DATA,
+ ProcessTriggerStrategy.triggerAtFixRate(expireProcessDataInterval));
+ this.expireProcessDataRuntimeKeepTimeMs =
+
configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME).toMillis();
+ this.expireProcessDataHistoryKeepTimeMs =
+
configs.getDuration(EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME).toMillis();
}
private Optional<TableProcess> triggerExpireSnapshot(TableRuntime
tableRuntime) {
@@ -301,6 +348,19 @@ public class IcebergProcessFactory implements
ProcessFactory {
return Optional.of(new HiveCommitSyncProcess(tableRuntime, localEngine));
}
+ private Optional<TableProcess> triggerProcessDataExpiring(TableRuntime
tableRuntime) {
+ if (localEngine == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ new ProcessDataExpiringProcess(
+ tableRuntime,
+ localEngine,
+ expireProcessDataRuntimeKeepTimeMs,
+ expireProcessDataHistoryKeepTimeMs));
+ }
+
@Override
public void close() {}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
similarity index 66%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
rename to
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
index 1bc4e9ac7..f08614121 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
@@ -16,62 +16,71 @@
* limitations under the License.
*/
-package org.apache.amoro.server.scheduler.inline;
+package org.apache.amoro.server.process.iceberg;
+import org.apache.amoro.Action;
+import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalProcess;
+import org.apache.amoro.process.TableProcess;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
+import java.util.Map;
-public class ProcessDataExpiringExecutor extends PeriodicTableScheduler {
- private static final Logger LOG =
LoggerFactory.getLogger(ProcessDataExpiringExecutor.class);
+/** Local table process for expiring optimizing runtime data and process
history records. */
+public class ProcessDataExpiringProcess extends TableProcess implements
LocalProcess {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ProcessDataExpiringProcess.class);
private final Persistency persistency = new Persistency();
private final long optimizingKeepTimeMs;
private final long processKeepTimeMs;
- private final long expireIntervalMs;
- public ProcessDataExpiringExecutor(
- TableService tableService,
- Duration optimizingKeepTime,
- Duration expireInterval,
- Duration processKeepTime) {
- super(tableService, 1);
- this.optimizingKeepTimeMs = optimizingKeepTime.toMillis();
- this.processKeepTimeMs = processKeepTime.toMillis();
- this.expireIntervalMs = expireInterval.toMillis();
+ public ProcessDataExpiringProcess(
+ TableRuntime tableRuntime,
+ ExecuteEngine engine,
+ long optimizingKeepTimeMs,
+ long processKeepTimeMs) {
+ super(tableRuntime, engine);
+ this.optimizingKeepTimeMs = optimizingKeepTimeMs;
+ this.processKeepTimeMs = processKeepTimeMs;
}
@Override
- protected long getNextExecutingTime(TableRuntime tableRuntime) {
- return expireIntervalMs;
+ public String tag() {
+ return getAction().getName().toLowerCase();
}
@Override
- protected boolean enabled(TableRuntime tableRuntime) {
- return true;
+ public void run() {
+ try {
+ persistency.doExpiring(tableRuntime);
+ } catch (Throwable t) {
+ LOG.error("Expiring table runtimes of {} failed.",
tableRuntime.getTableIdentifier(), t);
+ throw new RuntimeException(t);
+ }
}
@Override
- protected long getExecutorDelay() {
- return 0;
+ public Action getAction() {
+ return IcebergActions.EXPIRE_PROCESS_DATA;
}
@Override
- protected void execute(TableRuntime tableRuntime) {
- try {
- persistency.doExpiring(tableRuntime);
- } catch (Throwable throwable) {
- LOG.error(
- "Expiring table runtimes of {} failed.",
tableRuntime.getTableIdentifier(), throwable);
- }
+ public Map<String, String> getProcessParameters() {
+ return Maps.newHashMap();
+ }
+
+ @Override
+ public Map<String, String> getSummary() {
+ return Maps.newHashMap();
}
private class Persistency extends PersistentBase {
@@ -100,8 +109,9 @@ public class ProcessDataExpiringExecutor extends
PeriodicTableScheduler {
mapper -> mapper.deleteOptimizingQuotaBefore(tableId,
optimizingMinId)));
// 2. Expire process history terminal records (processKeepTimeMs, e.g.
7d)
- // Only deletes terminal records in the window between
processKeepTime and keepTime,
- // since records older than keepTime are already removed by step 1.
+ // Only deletes terminal records in the window between
processKeepTime and
+ // optimizingKeepTime,
+ // since records older than optimizingKeepTime are already removed by
step 1.
if (processKeepTimeMs < optimizingKeepTimeMs) {
long processMinId = SnowflakeIdGenerator.getMinSnowflakeId(now -
processKeepTimeMs);
doAs(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 52c96ed1d..0cc084802 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -22,15 +22,12 @@ import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.table.TableService;
-import java.time.Duration;
-
public class InlineTableExecutors {
private static final InlineTableExecutors instance = new
InlineTableExecutors();
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
- private ProcessDataExpiringExecutor processDataExpiringExecutor;
public static InlineTableExecutors getInstance() {
return instance;
@@ -40,23 +37,6 @@ public class InlineTableExecutors {
this.optimizingCommitExecutor =
new OptimizingCommitExecutor(
tableService,
conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT));
- Duration optimizingKeepTime =
- conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME)
- ? conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME)
- : Duration.ofDays(
-
conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS));
- Duration expireInterval =
-
conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL)
- ?
conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL)
- : Duration.ofHours(
-
conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS));
- Duration processKeepTime =
- conf.contains(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME)
- ? conf.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME)
- :
Duration.ofDays(conf.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS));
- this.processDataExpiringExecutor =
- new ProcessDataExpiringExecutor(
- tableService, optimizingKeepTime, expireInterval, processKeepTime);
this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService);
this.tableRefreshingExecutor =
new TableRuntimeRefreshExecutor(
@@ -77,8 +57,4 @@ public class InlineTableExecutors {
public OptimizingCommitExecutor getOptimizingCommitExecutor() {
return optimizingCommitExecutor;
}
-
- public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() {
- return processDataExpiringExecutor;
- }
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index 07898363a..1e770e359 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -385,8 +385,6 @@ public class AmsEnvironment {
+ "\n"
+ " self-optimizing:\n"
+ " commit-thread-count: 10\n"
- + " runtime-data-keep-days: 30\n"
- + " runtime-data-expire-interval-hours: 1\n"
+ " break-quota-limit-enabled: true\n"
+ "\n"
+ " database:\n"
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
index 414f760c2..63a308166 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
@@ -75,32 +75,6 @@ public class TestAmoroManagementConf {
assertTimeRelatedConfigs(serviceConfig, expectedConfig);
}
- @Test
- void testNewDurationConfigDefaults() {
- Configurations serviceConfig = new Configurations();
- Assertions.assertEquals(
- Duration.ofDays(30),
-
serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME));
- Assertions.assertEquals(
- Duration.ofHours(1),
-
serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL));
- Assertions.assertEquals(
- Duration.ofDays(7),
serviceConfig.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME));
- }
-
- @Test
- void testDeprecatedIntegerConfigDefaults() {
- Configurations serviceConfig = new Configurations();
- Assertions.assertEquals(
- 30,
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS));
- Assertions.assertEquals(
- 1,
- serviceConfig.getInteger(
-
AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS));
- Assertions.assertEquals(
- 7,
serviceConfig.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS));
- }
-
@Test
void testParsingDefaultStorageRelatedConfigs() {
Configurations serviceConfig = new Configurations();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 45aa9e9bf..e3f1c73f4 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -55,6 +55,8 @@ public class TestIcebergProcessFactory {
"clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE,
Duration.ofHours(24));
assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA,
Duration.ofHours(24));
assertSupportedAction("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES,
Duration.ofHours(1));
+ assertSupportedAction(
+ "expire-process-data", IcebergActions.EXPIRE_PROCESS_DATA,
Duration.ofHours(1));
}
@Test
@@ -73,6 +75,11 @@ public class TestIcebergProcessFactory {
"auto-create-tags", IcebergActions.AUTO_CREATE_TAGS,
TagsAutoCreatingProcess.class, 0);
assertTriggerWhenDue(
"sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES,
HiveCommitSyncProcess.class, 0);
+ assertTriggerWhenDue(
+ "expire-process-data",
+ IcebergActions.EXPIRE_PROCESS_DATA,
+ ProcessDataExpiringProcess.class,
+ 0);
}
@Test
@@ -134,6 +141,14 @@ public class TestIcebergProcessFactory {
assertRecover("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES,
HiveCommitSyncProcess.class);
}
+ @Test
+ public void testRecoverExpireProcessDataProcess() {
+ assertRecover(
+ "expire-process-data",
+ IcebergActions.EXPIRE_PROCESS_DATA,
+ ProcessDataExpiringProcess.class);
+ }
+
@Test
public void testRecoverUnsupportedActionThrows() {
IcebergProcessFactory factory = openedFactory("expire-snapshots");
@@ -180,12 +195,20 @@ public class TestIcebergProcessFactory {
Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME,
process.getExecutionEngine());
}
+ private Map<String, String> buildFactoryProperties(String configKey, String
interval) {
+ Map<String, String> properties = new HashMap<>();
+ if ("expire-process-data".equals(configKey)) {
+ properties.put("expire-process-data.runtime-data-expire-interval",
interval);
+ } else {
+ properties.put(configKey + ".enabled", "true");
+ properties.put(configKey + ".interval", interval);
+ }
+ return properties;
+ }
+
private IcebergProcessFactory openedFactory(String configKey) {
IcebergProcessFactory factory = new IcebergProcessFactory();
- Map<String, String> properties = new HashMap<>();
- properties.put(configKey + ".enabled", "true");
- properties.put(configKey + ".interval", "1h");
- factory.open(properties);
+ factory.open(buildFactoryProperties(configKey, "1h"));
return factory;
}
@@ -193,9 +216,7 @@ public class TestIcebergProcessFactory {
String configKey, org.apache.amoro.Action action, Duration interval) {
IcebergProcessFactory factory = new IcebergProcessFactory();
- Map<String, String> properties = new HashMap<>();
- properties.put(configKey + ".enabled", "true");
- properties.put(configKey + ".interval", interval.toHours() + "h");
+ Map<String, String> properties = buildFactoryProperties(configKey,
interval.toHours() + "h");
factory.open(properties);
@@ -212,10 +233,7 @@ public class TestIcebergProcessFactory {
String configKey, org.apache.amoro.Action action, Class<?> processClass,
long lastTime) {
IcebergProcessFactory factory = new IcebergProcessFactory();
- Map<String, String> properties = new HashMap<>();
- properties.put(configKey + ".enabled", "true");
- properties.put(configKey + ".interval", "1h");
- factory.open(properties);
+ factory.open(buildFactoryProperties(configKey, "1h"));
LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
similarity index 87%
rename from
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
rename to
amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
index 8e2139ba6..2f714d8e0 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package org.apache.amoro.server.scheduler.inline;
+package org.apache.amoro.server.process.iceberg;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.process.LocalExecutionEngine;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.AMSServiceTestBase;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.junit.Assert;
import org.junit.Before;
@@ -37,7 +37,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.List;
-public class TestProcessDataExpiringExecutor extends AMSServiceTestBase {
+public class TestProcessDataExpiringProcess extends AMSServiceTestBase {
private static final long TABLE_ID = 1L;
private static final ServerTableIdentifier TABLE_IDENTIFIER =
@@ -46,12 +46,13 @@ public class TestProcessDataExpiringExecutor extends
AMSServiceTestBase {
private final Persistency persistency = new Persistency();
private DefaultTableRuntime tableRuntime;
- private TableService tableService;
+ private LocalExecutionEngine engine;
@Before
public void mock() {
tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- tableService = Mockito.mock(TableService.class);
+ engine = Mockito.mock(LocalExecutionEngine.class);
+ Mockito.when(engine.name()).thenReturn(LocalExecutionEngine.ENGINE_NAME);
Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(TABLE_IDENTIFIER);
// Clean up any leftover data
persistency.cleanAll(TABLE_ID);
@@ -61,7 +62,6 @@ public class TestProcessDataExpiringExecutor extends
AMSServiceTestBase {
public void testProcessHistoryExpiringWhenShorterThanKeepTime() {
// optimizingKeepTime=30d, processKeepTime=7d
Duration optimizingKeepTime = Duration.ofDays(30);
- Duration expireInterval = Duration.ofHours(1);
Duration processKeepTime = Duration.ofDays(7);
long now = System.currentTimeMillis();
@@ -78,10 +78,10 @@ public class TestProcessDataExpiringExecutor extends
AMSServiceTestBase {
Assert.assertEquals(2, persistency.listProcesses(TABLE_ID).size());
- ProcessDataExpiringExecutor executor =
- new ProcessDataExpiringExecutor(
- tableService, optimizingKeepTime, expireInterval, processKeepTime);
- executor.execute(tableRuntime);
+ ProcessDataExpiringProcess process =
+ new ProcessDataExpiringProcess(
+ tableRuntime, engine, optimizingKeepTime.toMillis(),
processKeepTime.toMillis());
+ process.run();
List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID);
Assert.assertEquals(1, remaining.size());
@@ -92,7 +92,6 @@ public class TestProcessDataExpiringExecutor extends
AMSServiceTestBase {
public void testProcessHistoryNotExpiringWhenEqualToKeepTime() {
// When processKeepTime >= optimizingKeepTime, the extra process cleanup
should not trigger
Duration optimizingKeepTime = Duration.ofDays(7);
- Duration expireInterval = Duration.ofHours(1);
Duration processKeepTime = Duration.ofDays(7);
long now = System.currentTimeMillis();
@@ -104,10 +103,10 @@ public class TestProcessDataExpiringExecutor extends
AMSServiceTestBase {
Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size());
- ProcessDataExpiringExecutor executor =
- new ProcessDataExpiringExecutor(
- tableService, optimizingKeepTime, expireInterval, processKeepTime);
- executor.execute(tableRuntime);
+ ProcessDataExpiringProcess process =
+ new ProcessDataExpiringProcess(
+ tableRuntime, engine, optimizingKeepTime.toMillis(),
processKeepTime.toMillis());
+ process.run();
// The process should still exist - not expired by either mechanism
Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size());
@@ -117,7 +116,6 @@ public class TestProcessDataExpiringExecutor extends
AMSServiceTestBase {
public void testDeleteExpiredProcessesSkipsActiveStatuses() {
// optimizingKeepTime=30d, processKeepTime=7d
Duration optimizingKeepTime = Duration.ofDays(30);
- Duration expireInterval = Duration.ofHours(1);
Duration processKeepTime = Duration.ofDays(7);
long now = System.currentTimeMillis();
@@ -145,10 +143,10 @@ public class TestProcessDataExpiringExecutor extends
AMSServiceTestBase {
Assert.assertEquals(6, persistency.listProcesses(TABLE_ID).size());
- ProcessDataExpiringExecutor executor =
- new ProcessDataExpiringExecutor(
- tableService, optimizingKeepTime, expireInterval, processKeepTime);
- executor.execute(tableRuntime);
+ ProcessDataExpiringProcess process =
+ new ProcessDataExpiringProcess(
+ tableRuntime, engine, optimizingKeepTime.toMillis(),
processKeepTime.toMillis());
+ process.run();
List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID);
// Active statuses (RUNNING, SUBMITTED, PENDING, CANCELING) should survive
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
deleted file mode 100644
index 4a7c2895d..000000000
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.TableRuntime;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.time.Duration;
-
-/** Tests for configurable interval in ProcessDataExpiringExecutor. */
-public class TestConfigurableIntervalExecutors {
-
- @Test
- public void testProcessDataExpiringDefaultInterval() {
- Duration optimizingKeepTime = Duration.ofDays(30);
- Duration expireInterval = Duration.ofHours(1);
- Duration processKeepTime = Duration.ofDays(7);
- ProcessDataExpiringExecutor executor =
- new ProcessDataExpiringExecutor(null, optimizingKeepTime,
expireInterval, processKeepTime);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(
- Duration.ofHours(1).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-
- @Test
- public void testProcessDataExpiringCustomInterval() {
- Duration optimizingKeepTime = Duration.ofDays(15);
- Duration expireInterval = Duration.ofMinutes(30);
- Duration processKeepTime = Duration.ofDays(3);
- ProcessDataExpiringExecutor executor =
- new ProcessDataExpiringExecutor(null, optimizingKeepTime,
expireInterval, processKeepTime);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(
- Duration.ofMinutes(30).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-}
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index b454bc9c1..89bdd055a 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -31,4 +31,5 @@ public class IcebergActions {
public static final Action EXPIRE_SNAPSHOTS =
Action.register("expire-snapshots");
public static final Action CLEAN_DANGLING_DELETE =
Action.register("clean-dangling-delete-files");
public static final Action AUTO_CREATE_TAGS =
Action.register("auto-create-tags");
+ public static final Action EXPIRE_PROCESS_DATA =
Action.register("expire-process-data");
}
diff --git
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
index fb91cb63b..f3568cf03 100644
---
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
+++
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -51,6 +51,7 @@ public class TestLocalExecutionEngine {
assertCustomPoolByTag("expire-data");
assertCustomPoolByTag("auto-create-tags");
assertCustomPoolByTag("sync-hive-tables");
+ assertCustomPoolByTag("expire-process-data");
}
private void assertCustomPoolByTag(String tag) throws Exception {
@@ -162,6 +163,7 @@ public class TestLocalExecutionEngine {
properties.put("pool.expire-data.thread-count", "1");
properties.put("pool.auto-create-tags.thread-count", "1");
properties.put("pool.sync-hive-tables.thread-count", "1");
+ properties.put("pool.expire-process-data.thread-count", "1");
properties.put("process.status.ttl", ttl);
localEngine.open(properties);
return localEngine;
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index f4cdaaac8..ae2ee8d39 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -81,13 +81,8 @@ ams:
interval: 1min # 60000
max-pending-partition-count: 100 # default 100
- process:
- history-data-keep-time: 7d
-
self-optimizing:
commit-thread-count: 10
- runtime-data-keep-time: 30d
- runtime-data-expire-interval: 1h
refresh-group-interval: 30s
break-quota-limit-enabled: true
diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 14ebcc4e9..dcb5fd934 100755
--- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -26,4 +26,5 @@ execute-engines:
pool.clean-dangling-delete-files.thread-count: 10
pool.expire-data.thread-count: 10
pool.auto-create-tags.thread-count: 3
- pool.sync-hive-tables.thread-count: 10
\ No newline at end of file
+ pool.sync-hive-tables.thread-count: 10
+ pool.expire-process-data.thread-count: 1
\ No newline at end of file
diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index 1085d364f..6bb8c3830 100755
--- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -32,3 +32,6 @@ process-factories:
auto-create-tags.interval: "1min"
sync-hive-tables.enabled: "false"
sync-hive-tables.interval: "10min"
+ expire-process-data.runtime-data-expire-interval: "1h"
+ expire-process-data.runtime-data-keep-time: "30d"
+ expire-process-data.history-data-keep-time: "7d"
diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md
index 14f32149f..1217567bb 100644
--- a/docs/admin-guides/deployment.md
+++ b/docs/admin-guides/deployment.md
@@ -282,6 +282,9 @@ process-factories:
auto-create-tags.interval: "1m" # interval for auto creating tags
sync-hive-tables.enabled: "false" # enable synchronizing
Hive tables
sync-hive-tables.interval: "10min" # interval for
synchronizing Hive tables
+ expire-process-data.runtime-data-expire-interval: "1h" #
interval for expiring process data
+ expire-process-data.runtime-data-keep-time: "30d" # duration to keep
optimizing runtime data
+ expire-process-data.history-data-keep-time: "7d" # duration to
keep terminal process history records
```
{{< hint info >}}
@@ -316,6 +319,7 @@ execute-engines:
pool.expire-data.thread-count: 10 # thread pool for data
expiration
pool.auto-create-tags.thread-count: 3 # thread pool for auto
creating tags
pool.sync-hive-tables.thread-count: 10 # thread pool for
synchronizing Hive tables
+ pool.expire-process-data.thread-count: 1 # thread pool for expiring
process data
process.status.ttl: 4h # TTL for process status
cache
```
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 2a6c6b6e8..96c630070 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -97,8 +97,6 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| optimizer.task-execute-timeout | 2147483647 s | Timeout duration for task
execution, default to Integer.MAX_VALUE seconds(about 24,855 days). |
| overview-cache.max-size | 3360 | Max size of overview cache. |
| overview-cache.refresh-interval | 3 min | Interval for refreshing overview
cache. |
-| process.history-data-keep-days | 7 | Deprecated: use
'process.history-data-keep-time' instead. The number of days that process
history data is retained. |
-| process.history-data-keep-time | 7 d | Duration that process history data is
retained. Expired terminal process records will be deleted automatically. |
| refresh-external-catalogs.interval | 3 min | Interval to refresh the
external catalog. |
| refresh-external-catalogs.queue-size | 1000000 | The queue size of the
executors of the external catalog explorer. |
| refresh-external-catalogs.thread-count | 10 | The number of threads used for
discovering tables in external catalogs. |
@@ -110,10 +108,6 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| self-optimizing.commit-thread-count | 10 | The number of threads that
self-optimizing uses to submit results. |
| self-optimizing.plan-manifest-io-thread-count | 10 | Sets the size of the
worker pool. The worker pool limits the number of tasks concurrently processing
manifests in the base table implementation across all concurrent planning
operations. |
| self-optimizing.refresh-group-interval | 30 s | Optimizer group refresh
interval. |
-| self-optimizing.runtime-data-expire-interval | 1 h | Interval between
self-optimizing runtime data expiration runs. |
-| self-optimizing.runtime-data-expire-interval-hours | 1 | Deprecated: use
'self-optimizing.runtime-data-expire-interval' instead. The number of hours
that self-optimizing runtime data expire interval. |
-| self-optimizing.runtime-data-keep-days | 30 | Deprecated: use
'self-optimizing.runtime-data-keep-time' instead. The number of days that
self-optimizing runtime data keeps the runtime. |
-| self-optimizing.runtime-data-keep-time | 30 d | Duration that
self-optimizing runtime data is retained. |
| server-bind-host | 0.0.0.0 | The host bound to the server. |
| server-expose-host | | The exposed host of the server. |
| table-manifest-io.thread-count | 20 | Sets the size of the worker pool. The
worker pool limits the number of tasks concurrently processing manifests in the
base table implementation across all concurrent planning or commit operations. |