This is an automated email from the ASF dual-hosted git repository.

zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new acf8ef47a [AMORO-4246] Refactor process data expiring via 
ProcessFactory plugin (#4247)
acf8ef47a is described below

commit acf8ef47a3d9305ef217399a9aeb8c6d312bbe75
Author: WenLingzhang <[email protected]>
AuthorDate: Thu Jun 11 17:16:56 2026 +0800

    [AMORO-4246] Refactor process data expiring via ProcessFactory plugin 
(#4247)
    
    * Refactor process data expiring via ProcessFactory plugin
    
    * fixup
    
    * fixup
    
    * fixup
    
    ---------
    
    Co-authored-by: 张文领 <[email protected]>
---
 .../apache/amoro/server/AmoroManagementConf.java   | 50 ---------------
 .../apache/amoro/server/AmoroServiceContainer.java |  1 -
 .../process/iceberg/IcebergProcessFactory.java     | 60 ++++++++++++++++++
 .../iceberg/ProcessDataExpiringProcess.java}       | 72 ++++++++++++----------
 .../scheduler/inline/InlineTableExecutors.java     | 24 --------
 .../org/apache/amoro/server/AmsEnvironment.java    |  2 -
 .../amoro/server/TestAmoroManagementConf.java      | 26 --------
 .../process/iceberg/TestIcebergProcessFactory.java | 40 ++++++++----
 .../iceberg/TestProcessDataExpiringProcess.java}   | 38 ++++++------
 .../inline/TestConfigurableIntervalExecutors.java  | 56 -----------------
 .../main/java/org/apache/amoro/IcebergActions.java |  1 +
 .../amoro/process/TestLocalExecutionEngine.java    |  2 +
 dist/src/main/amoro-bin/conf/config.yaml           |  5 --
 .../amoro-bin/conf/plugins/execute-engines.yaml    |  3 +-
 .../amoro-bin/conf/plugins/process-factories.yaml  |  3 +
 docs/admin-guides/deployment.md                    |  4 ++
 docs/configuration/ams-config.md                   |  6 --
 17 files changed, 160 insertions(+), 233 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index f38f6f5ac..2ac6f5cfb 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -395,38 +395,6 @@ public class AmoroManagementConf {
           .defaultValue(10)
           .withDescription("The number of threads that self-optimizing uses to 
submit results.");
 
-  /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_KEEP_TIME} instead. */
-  @Deprecated
-  public static final ConfigOption<Integer> OPTIMIZING_RUNTIME_DATA_KEEP_DAYS =
-      ConfigOptions.key("self-optimizing.runtime-data-keep-days")
-          .intType()
-          .defaultValue(30)
-          .withDescription(
-              "Deprecated: use 'self-optimizing.runtime-data-keep-time' 
instead. "
-                  + "The number of days that self-optimizing runtime data 
keeps the runtime.");
-
-  /** @deprecated Use {@link #OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL} 
instead. */
-  @Deprecated
-  public static final ConfigOption<Integer> 
OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS =
-      ConfigOptions.key("self-optimizing.runtime-data-expire-interval-hours")
-          .intType()
-          .defaultValue(1)
-          .withDescription(
-              "Deprecated: use 'self-optimizing.runtime-data-expire-interval' 
instead. "
-                  + "The number of hours that self-optimizing runtime data 
expire interval.");
-
-  public static final ConfigOption<Duration> OPTIMIZING_RUNTIME_DATA_KEEP_TIME 
=
-      ConfigOptions.key("self-optimizing.runtime-data-keep-time")
-          .durationType()
-          .defaultValue(Duration.ofDays(30))
-          .withDescription("Duration that self-optimizing runtime data is 
retained.");
-
-  public static final ConfigOption<Duration> 
OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL =
-      ConfigOptions.key("self-optimizing.runtime-data-expire-interval")
-          .durationType()
-          .defaultValue(Duration.ofHours(1))
-          .withDescription("Interval between self-optimizing runtime data 
expiration runs.");
-
   public static final ConfigOption<Boolean> 
OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED =
       ConfigOptions.key("self-optimizing.break-quota-limit-enabled")
           .booleanType()
@@ -434,24 +402,6 @@ public class AmoroManagementConf {
           .withDescription(
               "Allow the table to break the quota limit when the resource is 
sufficient.");
 
-  /** @deprecated Use {@link #PROCESS_HISTORY_DATA_KEEP_TIME} instead. */
-  @Deprecated
-  public static final ConfigOption<Integer> PROCESS_HISTORY_DATA_KEEP_DAYS =
-      ConfigOptions.key("process.history-data-keep-days")
-          .intType()
-          .defaultValue(7)
-          .withDescription(
-              "Deprecated: use 'process.history-data-keep-time' instead. "
-                  + "The number of days that process history data is 
retained.");
-
-  public static final ConfigOption<Duration> PROCESS_HISTORY_DATA_KEEP_TIME =
-      ConfigOptions.key("process.history-data-keep-time")
-          .durationType()
-          .defaultValue(Duration.ofDays(7))
-          .withDescription(
-              "Duration that process history data is retained. "
-                  + "Expired terminal process records will be deleted 
automatically.");
-
   public static final ConfigOption<Duration> OVERVIEW_CACHE_REFRESH_INTERVAL =
       ConfigOptions.key("overview-cache.refresh-interval")
           .durationType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 638354983..c717d71b8 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -290,7 +290,6 @@ public class AmoroServiceContainer {
     addHandlerChain(optimizingService.getTableRuntimeHandler());
     addHandlerChain(processService.getTableHandlerChain());
     
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
-    
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
     tableService.initialize();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
index 6df589cc9..9f9f901de 100755
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -95,7 +95,36 @@ public class IcebergProcessFactory implements ProcessFactory 
{
           .durationType()
           .defaultValue(Duration.ofMinutes(10));
 
+  public static final ConfigOption<Duration> 
EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL =
+      ConfigOptions.key("expire-process-data.runtime-data-expire-interval")
+          .durationType()
+          .defaultValue(Duration.ofHours(1))
+          .withDescription(
+              "The interval for expiring process runtime data, "
+                  + "including optimizing runtime data and completed process 
metadata.");
+
+  public static final ConfigOption<Duration> 
EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME =
+      ConfigOptions.key("expire-process-data.runtime-data-keep-time")
+          .durationType()
+          .defaultValue(Duration.ofDays(30))
+          .withDescription(
+              "The maximum retention time for process runtime data "
+                  + "(e.g., process records, process states, task runtimes, 
optimizing quotas). "
+                  + "Data older than this will be cleaned up during 
expire-process-data execution.");
+
+  public static final ConfigOption<Duration> 
EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME =
+      ConfigOptions.key("expire-process-data.history-data-keep-time")
+          .durationType()
+          .defaultValue(Duration.ofDays(7))
+          .withDescription(
+              "The maximum retention time for completed process history data. "
+                  + "Only applies when it is shorter than 
runtime-data-keep-time,"
+                  + " and only affects terminal process records 
(SUCCESS/FAILED). "
+                  + "Active processes (RUNNING/SUBMITTED/PENDING/CANCELING) 
are never deleted.");
+
   private ExecuteEngine localEngine;
+  private long expireProcessDataRuntimeKeepTimeMs;
+  private long expireProcessDataHistoryKeepTimeMs;
   private final Map<Action, ProcessTriggerStrategy> actions = 
Maps.newHashMap();
   private final List<TableFormat> formats =
       Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, 
TableFormat.MIXED_HIVE);
@@ -139,6 +168,8 @@ public class IcebergProcessFactory implements 
ProcessFactory {
       return triggerAutoCreateTag(tableRuntime);
     } else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
       return triggerHiveCommitSync(tableRuntime);
+    } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) {
+      return triggerProcessDataExpiring(tableRuntime);
     }
 
     return Optional.empty();
@@ -170,6 +201,12 @@ public class IcebergProcessFactory implements 
ProcessFactory {
       return new TagsAutoCreatingProcess(tableRuntime, localEngine);
     } else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
       return new HiveCommitSyncProcess(tableRuntime, localEngine);
+    } else if (IcebergActions.EXPIRE_PROCESS_DATA.equals(action)) {
+      return new ProcessDataExpiringProcess(
+          tableRuntime,
+          localEngine,
+          expireProcessDataRuntimeKeepTimeMs,
+          expireProcessDataHistoryKeepTimeMs);
     }
 
     throw new RecoverProcessFailedException(
@@ -217,6 +254,16 @@ public class IcebergProcessFactory implements 
ProcessFactory {
       this.actions.put(
           IcebergActions.SYNC_HIVE_TABLES, 
ProcessTriggerStrategy.triggerAtFixRate(interval));
     }
+
+    Duration expireProcessDataInterval =
+        configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_EXPIRE_INTERVAL);
+    this.actions.put(
+        IcebergActions.EXPIRE_PROCESS_DATA,
+        ProcessTriggerStrategy.triggerAtFixRate(expireProcessDataInterval));
+    this.expireProcessDataRuntimeKeepTimeMs =
+        
configs.getDuration(EXPIRE_PROCESS_DATA_RUNTIME_DATA_KEEP_TIME).toMillis();
+    this.expireProcessDataHistoryKeepTimeMs =
+        
configs.getDuration(EXPIRE_PROCESS_DATA_HISTORY_DATA_KEEP_TIME).toMillis();
   }
 
   private Optional<TableProcess> triggerExpireSnapshot(TableRuntime 
tableRuntime) {
@@ -301,6 +348,19 @@ public class IcebergProcessFactory implements 
ProcessFactory {
     return Optional.of(new HiveCommitSyncProcess(tableRuntime, localEngine));
   }
 
+  private Optional<TableProcess> triggerProcessDataExpiring(TableRuntime 
tableRuntime) {
+    if (localEngine == null) {
+      return Optional.empty();
+    }
+
+    return Optional.of(
+        new ProcessDataExpiringProcess(
+            tableRuntime,
+            localEngine,
+            expireProcessDataRuntimeKeepTimeMs,
+            expireProcessDataHistoryKeepTimeMs));
+  }
+
   @Override
   public void close() {}
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
similarity index 66%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
rename to 
amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
index 1bc4e9ac7..f08614121 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/ProcessDataExpiringExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/ProcessDataExpiringProcess.java
@@ -16,62 +16,71 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.scheduler.inline;
+package org.apache.amoro.server.process.iceberg;
 
+import org.apache.amoro.Action;
+import org.apache.amoro.IcebergActions;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalProcess;
+import org.apache.amoro.process.TableProcess;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
-import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
+import java.util.Map;
 
-public class ProcessDataExpiringExecutor extends PeriodicTableScheduler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcessDataExpiringExecutor.class);
+/** Local table process for expiring optimizing runtime data and process 
history records. */
+public class ProcessDataExpiringProcess extends TableProcess implements 
LocalProcess {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ProcessDataExpiringProcess.class);
 
   private final Persistency persistency = new Persistency();
   private final long optimizingKeepTimeMs;
   private final long processKeepTimeMs;
-  private final long expireIntervalMs;
 
-  public ProcessDataExpiringExecutor(
-      TableService tableService,
-      Duration optimizingKeepTime,
-      Duration expireInterval,
-      Duration processKeepTime) {
-    super(tableService, 1);
-    this.optimizingKeepTimeMs = optimizingKeepTime.toMillis();
-    this.processKeepTimeMs = processKeepTime.toMillis();
-    this.expireIntervalMs = expireInterval.toMillis();
+  public ProcessDataExpiringProcess(
+      TableRuntime tableRuntime,
+      ExecuteEngine engine,
+      long optimizingKeepTimeMs,
+      long processKeepTimeMs) {
+    super(tableRuntime, engine);
+    this.optimizingKeepTimeMs = optimizingKeepTimeMs;
+    this.processKeepTimeMs = processKeepTimeMs;
   }
 
   @Override
-  protected long getNextExecutingTime(TableRuntime tableRuntime) {
-    return expireIntervalMs;
+  public String tag() {
+    return getAction().getName().toLowerCase();
   }
 
   @Override
-  protected boolean enabled(TableRuntime tableRuntime) {
-    return true;
+  public void run() {
+    try {
+      persistency.doExpiring(tableRuntime);
+    } catch (Throwable t) {
+      LOG.error("Expiring table runtimes of {} failed.", 
tableRuntime.getTableIdentifier(), t);
+      throw new RuntimeException(t);
+    }
   }
 
   @Override
-  protected long getExecutorDelay() {
-    return 0;
+  public Action getAction() {
+    return IcebergActions.EXPIRE_PROCESS_DATA;
   }
 
   @Override
-  protected void execute(TableRuntime tableRuntime) {
-    try {
-      persistency.doExpiring(tableRuntime);
-    } catch (Throwable throwable) {
-      LOG.error(
-          "Expiring table runtimes of {} failed.", 
tableRuntime.getTableIdentifier(), throwable);
-    }
+  public Map<String, String> getProcessParameters() {
+    return Maps.newHashMap();
+  }
+
+  @Override
+  public Map<String, String> getSummary() {
+    return Maps.newHashMap();
   }
 
   private class Persistency extends PersistentBase {
@@ -100,8 +109,9 @@ public class ProcessDataExpiringExecutor extends 
PeriodicTableScheduler {
                   mapper -> mapper.deleteOptimizingQuotaBefore(tableId, 
optimizingMinId)));
 
       // 2. Expire process history terminal records (processKeepTimeMs, e.g. 
7d)
-      //    Only deletes terminal records in the window between 
processKeepTime and keepTime,
-      //    since records older than keepTime are already removed by step 1.
+      //    Only deletes terminal records in the window between 
processKeepTime and
+      // optimizingKeepTime,
+      //    since records older than optimizingKeepTime are already removed by 
step 1.
       if (processKeepTimeMs < optimizingKeepTimeMs) {
         long processMinId = SnowflakeIdGenerator.getMinSnowflakeId(now - 
processKeepTimeMs);
         doAs(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 52c96ed1d..0cc084802 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -22,15 +22,12 @@ import org.apache.amoro.config.Configurations;
 import org.apache.amoro.server.AmoroManagementConf;
 import org.apache.amoro.server.table.TableService;
 
-import java.time.Duration;
-
 public class InlineTableExecutors {
 
   private static final InlineTableExecutors instance = new 
InlineTableExecutors();
   private TableRuntimeRefreshExecutor tableRefreshingExecutor;
   private BlockerExpiringExecutor blockerExpiringExecutor;
   private OptimizingCommitExecutor optimizingCommitExecutor;
-  private ProcessDataExpiringExecutor processDataExpiringExecutor;
 
   public static InlineTableExecutors getInstance() {
     return instance;
@@ -40,23 +37,6 @@ public class InlineTableExecutors {
     this.optimizingCommitExecutor =
         new OptimizingCommitExecutor(
             tableService, 
conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT));
-    Duration optimizingKeepTime =
-        conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME)
-            ? conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME)
-            : Duration.ofDays(
-                
conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS));
-    Duration expireInterval =
-        
conf.contains(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL)
-            ? 
conf.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL)
-            : Duration.ofHours(
-                
conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS));
-    Duration processKeepTime =
-        conf.contains(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME)
-            ? conf.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME)
-            : 
Duration.ofDays(conf.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS));
-    this.processDataExpiringExecutor =
-        new ProcessDataExpiringExecutor(
-            tableService, optimizingKeepTime, expireInterval, processKeepTime);
     this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService);
     this.tableRefreshingExecutor =
         new TableRuntimeRefreshExecutor(
@@ -77,8 +57,4 @@ public class InlineTableExecutors {
   public OptimizingCommitExecutor getOptimizingCommitExecutor() {
     return optimizingCommitExecutor;
   }
-
-  public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() {
-    return processDataExpiringExecutor;
-  }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index 07898363a..1e770e359 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -385,8 +385,6 @@ public class AmsEnvironment {
         + "\n"
         + "  self-optimizing:\n"
         + "    commit-thread-count: 10\n"
-        + "    runtime-data-keep-days: 30\n"
-        + "    runtime-data-expire-interval-hours: 1\n"
         + "    break-quota-limit-enabled: true\n"
         + "\n"
         + "  database:\n"
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
index 414f760c2..63a308166 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
@@ -75,32 +75,6 @@ public class TestAmoroManagementConf {
     assertTimeRelatedConfigs(serviceConfig, expectedConfig);
   }
 
-  @Test
-  void testNewDurationConfigDefaults() {
-    Configurations serviceConfig = new Configurations();
-    Assertions.assertEquals(
-        Duration.ofDays(30),
-        
serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_TIME));
-    Assertions.assertEquals(
-        Duration.ofHours(1),
-        
serviceConfig.get(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL));
-    Assertions.assertEquals(
-        Duration.ofDays(7), 
serviceConfig.get(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_TIME));
-  }
-
-  @Test
-  void testDeprecatedIntegerConfigDefaults() {
-    Configurations serviceConfig = new Configurations();
-    Assertions.assertEquals(
-        30, 
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS));
-    Assertions.assertEquals(
-        1,
-        serviceConfig.getInteger(
-            
AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS));
-    Assertions.assertEquals(
-        7, 
serviceConfig.getInteger(AmoroManagementConf.PROCESS_HISTORY_DATA_KEEP_DAYS));
-  }
-
   @Test
   void testParsingDefaultStorageRelatedConfigs() {
     Configurations serviceConfig = new Configurations();
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
index 45aa9e9bf..e3f1c73f4 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -55,6 +55,8 @@ public class TestIcebergProcessFactory {
         "clean-dangling-delete-files", IcebergActions.CLEAN_DANGLING_DELETE, 
Duration.ofHours(24));
     assertSupportedAction("expire-data", IcebergActions.EXPIRE_DATA, 
Duration.ofHours(24));
     assertSupportedAction("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, 
Duration.ofHours(1));
+    assertSupportedAction(
+        "expire-process-data", IcebergActions.EXPIRE_PROCESS_DATA, 
Duration.ofHours(1));
   }
 
   @Test
@@ -73,6 +75,11 @@ public class TestIcebergProcessFactory {
         "auto-create-tags", IcebergActions.AUTO_CREATE_TAGS, 
TagsAutoCreatingProcess.class, 0);
     assertTriggerWhenDue(
         "sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, 
HiveCommitSyncProcess.class, 0);
+    assertTriggerWhenDue(
+        "expire-process-data",
+        IcebergActions.EXPIRE_PROCESS_DATA,
+        ProcessDataExpiringProcess.class,
+        0);
   }
 
   @Test
@@ -134,6 +141,14 @@ public class TestIcebergProcessFactory {
     assertRecover("sync-hive-tables", IcebergActions.SYNC_HIVE_TABLES, 
HiveCommitSyncProcess.class);
   }
 
+  @Test
+  public void testRecoverExpireProcessDataProcess() {
+    assertRecover(
+        "expire-process-data",
+        IcebergActions.EXPIRE_PROCESS_DATA,
+        ProcessDataExpiringProcess.class);
+  }
+
   @Test
   public void testRecoverUnsupportedActionThrows() {
     IcebergProcessFactory factory = openedFactory("expire-snapshots");
@@ -180,12 +195,20 @@ public class TestIcebergProcessFactory {
     Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME, 
process.getExecutionEngine());
   }
 
+  private Map<String, String> buildFactoryProperties(String configKey, String 
interval) {
+    Map<String, String> properties = new HashMap<>();
+    if ("expire-process-data".equals(configKey)) {
+      properties.put("expire-process-data.runtime-data-expire-interval", 
interval);
+    } else {
+      properties.put(configKey + ".enabled", "true");
+      properties.put(configKey + ".interval", interval);
+    }
+    return properties;
+  }
+
   private IcebergProcessFactory openedFactory(String configKey) {
     IcebergProcessFactory factory = new IcebergProcessFactory();
-    Map<String, String> properties = new HashMap<>();
-    properties.put(configKey + ".enabled", "true");
-    properties.put(configKey + ".interval", "1h");
-    factory.open(properties);
+    factory.open(buildFactoryProperties(configKey, "1h"));
     return factory;
   }
 
@@ -193,9 +216,7 @@ public class TestIcebergProcessFactory {
       String configKey, org.apache.amoro.Action action, Duration interval) {
     IcebergProcessFactory factory = new IcebergProcessFactory();
 
-    Map<String, String> properties = new HashMap<>();
-    properties.put(configKey + ".enabled", "true");
-    properties.put(configKey + ".interval", interval.toHours() + "h");
+    Map<String, String> properties = buildFactoryProperties(configKey, 
interval.toHours() + "h");
 
     factory.open(properties);
 
@@ -212,10 +233,7 @@ public class TestIcebergProcessFactory {
       String configKey, org.apache.amoro.Action action, Class<?> processClass, 
long lastTime) {
     IcebergProcessFactory factory = new IcebergProcessFactory();
 
-    Map<String, String> properties = new HashMap<>();
-    properties.put(configKey + ".enabled", "true");
-    properties.put(configKey + ".interval", "1h");
-    factory.open(properties);
+    factory.open(buildFactoryProperties(configKey, "1h"));
 
     LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
     doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
similarity index 87%
rename from 
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
rename to 
amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
index 8e2139ba6..2f714d8e0 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestProcessDataExpiringExecutor.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestProcessDataExpiringProcess.java
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.scheduler.inline;
+package org.apache.amoro.server.process.iceberg;
 
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.process.LocalExecutionEngine;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.server.AMSServiceTestBase;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,7 +37,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 
-public class TestProcessDataExpiringExecutor extends AMSServiceTestBase {
+public class TestProcessDataExpiringProcess extends AMSServiceTestBase {
 
   private static final long TABLE_ID = 1L;
   private static final ServerTableIdentifier TABLE_IDENTIFIER =
@@ -46,12 +46,13 @@ public class TestProcessDataExpiringExecutor extends 
AMSServiceTestBase {
 
   private final Persistency persistency = new Persistency();
   private DefaultTableRuntime tableRuntime;
-  private TableService tableService;
+  private LocalExecutionEngine engine;
 
   @Before
   public void mock() {
     tableRuntime = Mockito.mock(DefaultTableRuntime.class);
-    tableService = Mockito.mock(TableService.class);
+    engine = Mockito.mock(LocalExecutionEngine.class);
+    Mockito.when(engine.name()).thenReturn(LocalExecutionEngine.ENGINE_NAME);
     
Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(TABLE_IDENTIFIER);
     // Clean up any leftover data
     persistency.cleanAll(TABLE_ID);
@@ -61,7 +62,6 @@ public class TestProcessDataExpiringExecutor extends 
AMSServiceTestBase {
   public void testProcessHistoryExpiringWhenShorterThanKeepTime() {
     // optimizingKeepTime=30d, processKeepTime=7d
     Duration optimizingKeepTime = Duration.ofDays(30);
-    Duration expireInterval = Duration.ofHours(1);
     Duration processKeepTime = Duration.ofDays(7);
 
     long now = System.currentTimeMillis();
@@ -78,10 +78,10 @@ public class TestProcessDataExpiringExecutor extends 
AMSServiceTestBase {
 
     Assert.assertEquals(2, persistency.listProcesses(TABLE_ID).size());
 
-    ProcessDataExpiringExecutor executor =
-        new ProcessDataExpiringExecutor(
-            tableService, optimizingKeepTime, expireInterval, processKeepTime);
-    executor.execute(tableRuntime);
+    ProcessDataExpiringProcess process =
+        new ProcessDataExpiringProcess(
+            tableRuntime, engine, optimizingKeepTime.toMillis(), 
processKeepTime.toMillis());
+    process.run();
 
     List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID);
     Assert.assertEquals(1, remaining.size());
@@ -92,7 +92,6 @@ public class TestProcessDataExpiringExecutor extends 
AMSServiceTestBase {
   public void testProcessHistoryNotExpiringWhenEqualToKeepTime() {
     // When processKeepTime >= optimizingKeepTime, the extra process cleanup 
should not trigger
     Duration optimizingKeepTime = Duration.ofDays(7);
-    Duration expireInterval = Duration.ofHours(1);
     Duration processKeepTime = Duration.ofDays(7);
 
     long now = System.currentTimeMillis();
@@ -104,10 +103,10 @@ public class TestProcessDataExpiringExecutor extends 
AMSServiceTestBase {
 
     Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size());
 
-    ProcessDataExpiringExecutor executor =
-        new ProcessDataExpiringExecutor(
-            tableService, optimizingKeepTime, expireInterval, processKeepTime);
-    executor.execute(tableRuntime);
+    ProcessDataExpiringProcess process =
+        new ProcessDataExpiringProcess(
+            tableRuntime, engine, optimizingKeepTime.toMillis(), 
processKeepTime.toMillis());
+    process.run();
 
     // The process should still exist - not expired by either mechanism
     Assert.assertEquals(1, persistency.listProcesses(TABLE_ID).size());
@@ -117,7 +116,6 @@ public class TestProcessDataExpiringExecutor extends 
AMSServiceTestBase {
   public void testDeleteExpiredProcessesSkipsActiveStatuses() {
     // optimizingKeepTime=30d, processKeepTime=7d
     Duration optimizingKeepTime = Duration.ofDays(30);
-    Duration expireInterval = Duration.ofHours(1);
     Duration processKeepTime = Duration.ofDays(7);
 
     long now = System.currentTimeMillis();
@@ -145,10 +143,10 @@ public class TestProcessDataExpiringExecutor extends 
AMSServiceTestBase {
 
     Assert.assertEquals(6, persistency.listProcesses(TABLE_ID).size());
 
-    ProcessDataExpiringExecutor executor =
-        new ProcessDataExpiringExecutor(
-            tableService, optimizingKeepTime, expireInterval, processKeepTime);
-    executor.execute(tableRuntime);
+    ProcessDataExpiringProcess process =
+        new ProcessDataExpiringProcess(
+            tableRuntime, engine, optimizingKeepTime.toMillis(), 
processKeepTime.toMillis());
+    process.run();
 
     List<TableProcessMeta> remaining = persistency.listProcesses(TABLE_ID);
     // Active statuses (RUNNING, SUBMITTED, PENDING, CANCELING) should survive
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
deleted file mode 100644
index 4a7c2895d..000000000
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler.inline;
-
-import org.apache.amoro.TableRuntime;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.time.Duration;
-
-/** Tests for configurable interval in ProcessDataExpiringExecutor. */
-public class TestConfigurableIntervalExecutors {
-
-  @Test
-  public void testProcessDataExpiringDefaultInterval() {
-    Duration optimizingKeepTime = Duration.ofDays(30);
-    Duration expireInterval = Duration.ofHours(1);
-    Duration processKeepTime = Duration.ofDays(7);
-    ProcessDataExpiringExecutor executor =
-        new ProcessDataExpiringExecutor(null, optimizingKeepTime, 
expireInterval, processKeepTime);
-
-    TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
-    Assert.assertEquals(
-        Duration.ofHours(1).toMillis(), 
executor.getNextExecutingTime(tableRuntime));
-  }
-
-  @Test
-  public void testProcessDataExpiringCustomInterval() {
-    Duration optimizingKeepTime = Duration.ofDays(15);
-    Duration expireInterval = Duration.ofMinutes(30);
-    Duration processKeepTime = Duration.ofDays(3);
-    ProcessDataExpiringExecutor executor =
-        new ProcessDataExpiringExecutor(null, optimizingKeepTime, 
expireInterval, processKeepTime);
-
-    TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
-    Assert.assertEquals(
-        Duration.ofMinutes(30).toMillis(), 
executor.getNextExecutingTime(tableRuntime));
-  }
-}
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java 
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index b454bc9c1..89bdd055a 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -31,4 +31,5 @@ public class IcebergActions {
   public static final Action EXPIRE_SNAPSHOTS = 
Action.register("expire-snapshots");
   public static final Action CLEAN_DANGLING_DELETE = 
Action.register("clean-dangling-delete-files");
   public static final Action AUTO_CREATE_TAGS = 
Action.register("auto-create-tags");
+  public static final Action EXPIRE_PROCESS_DATA = 
Action.register("expire-process-data");
 }
diff --git 
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
 
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
index fb91cb63b..f3568cf03 100644
--- 
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
+++ 
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -51,6 +51,7 @@ public class TestLocalExecutionEngine {
     assertCustomPoolByTag("expire-data");
     assertCustomPoolByTag("auto-create-tags");
     assertCustomPoolByTag("sync-hive-tables");
+    assertCustomPoolByTag("expire-process-data");
   }
 
   private void assertCustomPoolByTag(String tag) throws Exception {
@@ -162,6 +163,7 @@ public class TestLocalExecutionEngine {
     properties.put("pool.expire-data.thread-count", "1");
     properties.put("pool.auto-create-tags.thread-count", "1");
     properties.put("pool.sync-hive-tables.thread-count", "1");
+    properties.put("pool.expire-process-data.thread-count", "1");
     properties.put("process.status.ttl", ttl);
     localEngine.open(properties);
     return localEngine;
diff --git a/dist/src/main/amoro-bin/conf/config.yaml 
b/dist/src/main/amoro-bin/conf/config.yaml
index f4cdaaac8..ae2ee8d39 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -81,13 +81,8 @@ ams:
     interval: 1min # 60000
     max-pending-partition-count: 100 # default 100
 
-  process:
-    history-data-keep-time: 7d
-
   self-optimizing:
     commit-thread-count: 10
-    runtime-data-keep-time: 30d
-    runtime-data-expire-interval: 1h
     refresh-group-interval: 30s
     break-quota-limit-enabled: true
 
diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml 
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 14ebcc4e9..dcb5fd934 100755
--- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -26,4 +26,5 @@ execute-engines:
       pool.clean-dangling-delete-files.thread-count: 10
       pool.expire-data.thread-count: 10
       pool.auto-create-tags.thread-count: 3
-      pool.sync-hive-tables.thread-count: 10
\ No newline at end of file
+      pool.sync-hive-tables.thread-count: 10
+      pool.expire-process-data.thread-count: 1
\ No newline at end of file
diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml 
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index 1085d364f..6bb8c3830 100755
--- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -32,3 +32,6 @@ process-factories:
       auto-create-tags.interval: "1min"
       sync-hive-tables.enabled: "false"
       sync-hive-tables.interval: "10min"
+      expire-process-data.runtime-data-expire-interval: "1h"
+      expire-process-data.runtime-data-keep-time: "30d"
+      expire-process-data.history-data-keep-time: "7d"
diff --git a/docs/admin-guides/deployment.md b/docs/admin-guides/deployment.md
index 14f32149f..1217567bb 100644
--- a/docs/admin-guides/deployment.md
+++ b/docs/admin-guides/deployment.md
@@ -282,6 +282,9 @@ process-factories:
       auto-create-tags.interval: "1m"        # interval for auto creating tags
       sync-hive-tables.enabled: "false"             # enable synchronizing 
Hive tables
       sync-hive-tables.interval: "10min"            # interval for 
synchronizing Hive tables
+      expire-process-data.runtime-data-expire-interval: "1h"            # 
interval for expiring process data
+      expire-process-data.runtime-data-keep-time: "30d"    # duration to keep 
optimizing runtime data
+      expire-process-data.history-data-keep-time: "7d"           # duration to 
keep terminal process history records
 ```
 
 {{< hint info >}}
@@ -316,6 +319,7 @@ execute-engines:
       pool.expire-data.thread-count: 10               # thread pool for data 
expiration
       pool.auto-create-tags.thread-count: 3           # thread pool for auto 
creating tags
       pool.sync-hive-tables.thread-count: 10          # thread pool for 
synchronizing Hive tables
+      pool.expire-process-data.thread-count: 1      # thread pool for expiring 
process data
       process.status.ttl: 4h                          # TTL for process status 
cache
 ```
 
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 2a6c6b6e8..96c630070 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -97,8 +97,6 @@ table td:last-child, table th:last-child { width: 40%; 
word-break: break-all; }
 | optimizer.task-execute-timeout | 2147483647 s | Timeout duration for task 
execution, default to Integer.MAX_VALUE seconds(about 24,855 days). |
 | overview-cache.max-size | 3360 | Max size of overview cache. |
 | overview-cache.refresh-interval | 3 min | Interval for refreshing overview 
cache. |
-| process.history-data-keep-days | 7 | Deprecated: use 
'process.history-data-keep-time' instead. The number of days that process 
history data is retained. |
-| process.history-data-keep-time | 7 d | Duration that process history data is 
retained. Expired terminal process records will be deleted automatically. |
 | refresh-external-catalogs.interval | 3 min | Interval to refresh the 
external catalog. |
 | refresh-external-catalogs.queue-size | 1000000 | The queue size of the 
executors of the external catalog explorer. |
 | refresh-external-catalogs.thread-count | 10 | The number of threads used for 
discovering tables in external catalogs. |
@@ -110,10 +108,6 @@ table td:last-child, table th:last-child { width: 40%; 
word-break: break-all; }
 | self-optimizing.commit-thread-count | 10 | The number of threads that 
self-optimizing uses to submit results. |
 | self-optimizing.plan-manifest-io-thread-count | 10 | Sets the size of the 
worker pool. The worker pool limits the number of tasks concurrently processing 
manifests in the base table implementation across all concurrent planning 
operations. |
 | self-optimizing.refresh-group-interval | 30 s | Optimizer group refresh 
interval. |
-| self-optimizing.runtime-data-expire-interval | 1 h | Interval between 
self-optimizing runtime data expiration runs. |
-| self-optimizing.runtime-data-expire-interval-hours | 1 | Deprecated: use 
'self-optimizing.runtime-data-expire-interval' instead. The number of hours 
that self-optimizing runtime data expire interval. |
-| self-optimizing.runtime-data-keep-days | 30 | Deprecated: use 
'self-optimizing.runtime-data-keep-time' instead. The number of days that 
self-optimizing runtime data keeps the runtime. |
-| self-optimizing.runtime-data-keep-time | 30 d | Duration that 
self-optimizing runtime data is retained. |
 | server-bind-host | 0.0.0.0 | The host bound to the server. |
 | server-expose-host |  | The exposed host of the server. |
 | table-manifest-io.thread-count | 20 | Sets the size of the worker pool. The 
worker pool limits the number of tasks concurrently processing manifests in the 
base table implementation across all concurrent planning or commit operations. |


Reply via email to