This is an automated email from the ASF dual-hosted git repository.

nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 743e6f1cd [Improvement]: Refactor process API and table runtime 
extension interfaces (#4097)
743e6f1cd is described below

commit 743e6f1cde34ec5b0dcc6b93503557c2daed2b12
Author: baiyangtx <[email protected]>
AuthorDate: Wed Feb 25 20:02:28 2026 +0800

    [Improvement]: Refactor process API and table runtime extension interfaces 
(#4097)
    
    * improvement: refactor process API and table runtime extension interfaces
    
    # Context
    
    Split from upstream PR apache/amoro#4081 to make review easier.
    
    This MR introduces the new process extension APIs in common module, without 
touching most AMS internals yet.
    
    # Changes
    
    - Promote `ProcessFactory` to a richer abstraction in `amoro-common`, 
preparing for plugin-based table process extension.
    - Adjust `Action`/`IcebergActions` and `TableRuntime` to align with the new 
process model.
    - Move `ActionCoordinator` into `amoro-common` so it can be shared as a 
public abstraction.
    - Introduce `ProcessTriggerStrategy` to describe trigger policies for 
processes.
    - Clean up legacy process state classes which will be replaced by the new 
abstractions in follow-up MRs.
    
    # Notes
    
    - This commit only updates the common module and the shared 
`ActionCoordinator` API; AMS-side wiring and runtime refactors will be done in 
separate branches/MRs as discussed.
    
    Co-Authored-By: Aime <[email protected]>
    Change-Id: If84ada8fcae1cfb11577d56d3866db7ce0949102
    
    * fix compile error
    
    * fix compile error
    
    * fix compile error3
    
    * getTableConfig
    
    * refactor
    
    ---------
    
    Co-authored-by: zhangyongxiang.alpha <[email protected]>
    Co-authored-by: Aime <[email protected]>
---
 .../persistence/mapper/ProcessStateMapper.java     |  87 -------
 .../server/process/ActionCoordinatorScheduler.java |  17 +-
 .../amoro/server/process/ProcessService.java       |   3 +-
 .../amoro/server/process/TableProcessMeta.java     |  20 --
 .../scheduler/PeriodicExternalScheduler.java       | 278 ---------------------
 .../amoro/server/table/AbstractTableRuntime.java   |  67 ++++-
 .../amoro/server/table/DefaultTableRuntime.java    |  74 +-----
 .../server/table/DefaultTableRuntimeFactory.java   |  11 +
 .../server/table/SupportProcessManagement.java     |  32 +--
 .../server/process/MockActionCoordinator.java      |  32 +--
 .../src/main/java/org/apache/amoro/Action.java     |  41 ++-
 .../main/java/org/apache/amoro/IcebergActions.java |  10 +-
 .../main/java/org/apache/amoro/TableRuntime.java   |  11 +-
 .../apache/amoro}/process/ActionCoordinator.java   |  37 +--
 .../org/apache/amoro/process/AmoroProcess.java     |   2 +-
 .../org/apache/amoro/process/OptimizingState.java  |  80 ------
 .../org/apache/amoro/process/ProcessFactory.java   |  33 ++-
 .../org/apache/amoro/process/ProcessState.java     |  77 ------
 .../amoro/process/ProcessTriggerStrategy.java      |  57 +++++
 .../RecoverProcessFailedException.java}            |  24 +-
 .../apache/amoro/process/TableProcessState.java    | 224 -----------------
 .../apache/amoro/table/TableRuntimeFactory.java    |   6 +
 22 files changed, 240 insertions(+), 983 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
deleted file mode 100644
index f7b755781..000000000
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.persistence.mapper;
-
-import org.apache.amoro.process.TableProcessState;
-import org.apache.ibatis.annotations.Insert;
-import org.apache.ibatis.annotations.Options;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Result;
-import org.apache.ibatis.annotations.ResultMap;
-import org.apache.ibatis.annotations.Results;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
-
-import java.util.Map;
-
-public interface ProcessStateMapper {
-
-  @Insert(
-      "INSERT INTO table_process_state "
-          + "(process_id, action, table_id, retry_num, status, start_time, 
end_time, fail_reason, summary) "
-          + "VALUES "
-          + "(#{id}, #{action}, #{tableId}, #{retryNumber}, #{status}, 
#{startTime}, #{endTime}, #{failedReason}, #{summary})")
-  @Options(useGeneratedKeys = true, keyProperty = "id")
-  void createProcessState(TableProcessState state);
-
-  @Update(
-      "UPDATE table_process_state "
-          + "SET status = #{status}, start_time = #{startTime} "
-          + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
-  void updateProcessRunning(TableProcessState state);
-
-  @Update(
-      "UPDATE table_process_state "
-          + "SET status = #{status}, end_time = #{endTime} "
-          + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
-  void updateProcessCompleted(TableProcessState state);
-
-  @Update(
-      "UPDATE table_process_state "
-          + "SET status = #{status}, end_time = #{endTime}, fail_reason = 
#{failedReason} "
-          + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
-  void updateProcessFailed(TableProcessState state);
-
-  @Select(
-      "SELECT process_id, action, table_id, retry_num, status, start_time, 
end_time, fail_reason, summary "
-          + "FROM table_process_state "
-          + "WHERE process_id = #{processId}")
-  @Results(
-      id = "TableProcessStateResultMap",
-      value = {
-        @Result(property = "id", column = "process_id"),
-        @Result(property = "action", column = "action"),
-        @Result(property = "tableId", column = "table_id"),
-        @Result(property = "retryNumber", column = "retry_num"),
-        @Result(property = "status", column = "status"),
-        @Result(property = "startTime", column = "start_time"),
-        @Result(property = "endTime", column = "end_time"),
-        @Result(property = "failedReason", column = "fail_reason"),
-        @Result(property = "summary", column = "summary", javaType = Map.class)
-      })
-  TableProcessState getProcessStateById(@Param("processId") long processId);
-
-  /** Query TableProcessState by table_id */
-  @Select(
-      "SELECT process_id, action, table_id, retry_num, status, start_time, 
end_time, fail_reason, summary "
-          + "FROM table_process_state "
-          + "WHERE table_id = #{tableId}")
-  @ResultMap("TableProcessStateResultMap")
-  TableProcessState getProcessStateByTableId(@Param("tableId") long tableId);
-}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
index 83574c663..daf586941 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server.process;
 
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
 import org.apache.amoro.process.TableProcess;
 import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
@@ -27,6 +28,8 @@ import org.apache.amoro.server.table.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 /**
  * Periodic scheduler that delegates scheduling decisions to an {@link 
ActionCoordinator}. It
  * creates, recovers and retries table processes via {@link ProcessService}.
@@ -95,8 +98,8 @@ public class ActionCoordinatorScheduler extends 
PeriodicTableScheduler {
    */
   @Override
   protected void execute(TableRuntime tableRuntime) {
-    TableProcess process = coordinator.createTableProcess(tableRuntime);
-    processService.register(tableRuntime, process);
+    Optional<TableProcess> process = coordinator.trigger(tableRuntime);
+    process.ifPresent(p -> processService.register(tableRuntime, p));
   }
 
   /**
@@ -110,16 +113,6 @@ public class ActionCoordinatorScheduler extends 
PeriodicTableScheduler {
     processService.recover(tableRuntime, process);
   }
 
-  /**
-   * Retry a failed table process.
-   *
-   * @param process process to retry
-   */
-  protected void retry(TableProcess process) {
-    process = coordinator.retryTableProcess(process);
-    processService.retry(process);
-  }
-
   /**
    * Get executor delay from coordinator.
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index ee9f136dc..252d1574b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -25,6 +25,7 @@ import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.ActionCoordinator;
 import org.apache.amoro.process.ProcessEvent;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.process.TableProcess;
@@ -242,7 +243,7 @@ public class ProcessService extends PersistentBase {
                     "Regular Retry.",
                     process.getProcessParameters(),
                     process.getSummary());
-            scheduler.retry(process);
+            executeOrTraceProcess(process);
           } else {
             untrackTableProcessInstance(
                 process.getTableRuntime().getTableIdentifier(), 
process.getId());
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
index 311b60e03..6890c25c1 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
@@ -19,7 +19,6 @@
 package org.apache.amoro.server.process;
 
 import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcessState;
 import org.apache.amoro.process.TableProcessStore;
 
 import java.util.HashMap;
@@ -189,25 +188,6 @@ public class TableProcessMeta {
     return tableProcessMeta;
   }
 
-  @Deprecated
-  public static TableProcessMeta fromTableProcessState(TableProcessState 
tableProcessState) {
-    TableProcessMeta tableProcessMeta = new TableProcessMeta();
-    tableProcessMeta.setProcessId(tableProcessState.getId());
-    
tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId());
-    
tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier());
-    tableProcessMeta.setStatus(tableProcessState.getStatus());
-    tableProcessMeta.setProcessType(tableProcessState.getAction().getName());
-    tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc());
-    
tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine());
-    tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber());
-    tableProcessMeta.setCreateTime(tableProcessState.getStartTime());
-    tableProcessMeta.setFinishTime(tableProcessState.getEndTime());
-    tableProcessMeta.setFailMessage(tableProcessState.getFailedReason());
-    
tableProcessMeta.setProcessParameters(tableProcessState.getProcessParameters());
-    tableProcessMeta.setSummary(tableProcessState.getSummary());
-    return tableProcessMeta;
-  }
-
   public static TableProcessMeta of(
       long processId,
       long tableId,
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
deleted file mode 100644
index 346f02772..000000000
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler;
-
-import org.apache.amoro.Action;
-import org.apache.amoro.SupportsProcessPlugins;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.process.AmoroProcess;
-import org.apache.amoro.process.ManagedProcess;
-import org.apache.amoro.process.ProcessFactory;
-import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.SimpleFuture;
-import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessState;
-import org.apache.amoro.process.TableProcessStore;
-import org.apache.amoro.resource.ExternalResourceContainer;
-import org.apache.amoro.resource.Resource;
-import org.apache.amoro.resource.ResourceManager;
-import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
-import org.apache.amoro.server.process.DefaultTableProcessStore;
-import org.apache.amoro.server.process.TableProcessMeta;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
-
-import java.util.List;
-import java.util.Optional;
-
-public abstract class PeriodicExternalScheduler extends PeriodicTableScheduler 
{
-
-  private final ExternalResourceContainer resourceContainer;
-  private final ResourceManager resourceManager;
-  private final ProcessFactory processFactory;
-
-  public PeriodicExternalScheduler(
-      ResourceManager resourceManager,
-      ExternalResourceContainer resourceContainer,
-      Action action,
-      TableService tableService,
-      int poolSize) {
-    super(action, tableService, poolSize);
-    this.resourceContainer = resourceContainer;
-    this.resourceManager = resourceManager;
-    this.processFactory = generateProcessFactory();
-  }
-
-  @Override
-  protected void initHandler(List<TableRuntime> tableRuntimeList) {
-    tableRuntimeList.stream()
-        .filter(t -> t instanceof SupportsProcessPlugins)
-        .map(t -> (SupportsProcessPlugins) t)
-        .forEach(tableRuntime -> tableRuntime.install(getAction(), 
processFactory));
-    super.initHandler(tableRuntimeList);
-  }
-
-  @Override
-  protected boolean enabled(TableRuntime tableRuntime) {
-    return Optional.of(tableRuntime)
-        .filter(t -> t instanceof SupportsProcessPlugins)
-        .map(t -> (SupportsProcessPlugins) t)
-        .map(t -> t.enabled(getAction()))
-        .orElse(false);
-  }
-
-  @Override
-  protected void execute(TableRuntime tableRuntime) {
-    Preconditions.checkArgument(tableRuntime instanceof 
SupportsProcessPlugins);
-    SupportsProcessPlugins runtimeSupportProcessPlugin = 
(SupportsProcessPlugins) tableRuntime;
-    // Trigger a table process and check conflicts by table runtime
-    // Update process state after process completed, the callback must be 
register first
-    AmoroProcess process = runtimeSupportProcessPlugin.trigger(getAction());
-    process.getCompleteFuture().whenCompleted(() -> 
persistTableProcess(process));
-    ManagedProcess managedProcess = new ManagedTableProcess(process);
-
-    // Submit the table process to resource manager, this is a sync operation
-    // update process completed and delete related resources
-    managedProcess.submit();
-
-    // Trace the table process by async framework so that process can be 
called back when completed
-    trace(process);
-  }
-
-  protected int getMaxRetryNumber() {
-    return 1;
-  }
-
-  protected abstract void trace(AmoroProcess process);
-
-  protected ProcessFactory generateProcessFactory() {
-    return new ExternalProcessFactory();
-  }
-
-  protected void persistTableProcess(AmoroProcess process) {
-    TableProcessStore store = process.store();
-    if (store.getStatus() == ProcessStatus.SUBMITTED) {
-      new PersistencyHelper().createProcessState(store);
-    } else {
-      new PersistencyHelper().updateProcessStatus(store);
-    }
-  }
-
-  private static class PersistencyHelper extends PersistentBase {
-
-    void createProcessState(TableProcessStore store) {
-      TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store);
-      doAs(
-          TableProcessMapper.class,
-          mapper ->
-              mapper.updateProcess(
-                  meta.getTableId(),
-                  meta.getProcessId(),
-                  meta.getExternalProcessIdentifier(),
-                  meta.getStatus(),
-                  meta.getProcessStage(),
-                  meta.getRetryNumber(),
-                  meta.getFinishTime(),
-                  meta.getFailMessage(),
-                  meta.getProcessParameters(),
-                  meta.getSummary()));
-    }
-
-    void updateProcessStatus(TableProcessStore store) {
-      TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store);
-      doAs(
-          TableProcessMapper.class,
-          mapper ->
-              mapper.updateProcess(
-                  meta.getTableId(),
-                  meta.getProcessId(),
-                  meta.getExternalProcessIdentifier(),
-                  meta.getStatus(),
-                  meta.getProcessStage(),
-                  meta.getRetryNumber(),
-                  meta.getFinishTime(),
-                  meta.getFailMessage(),
-                  meta.getProcessParameters(),
-                  meta.getSummary()));
-    }
-  }
-
-  private class ExternalTableProcess extends TableProcess {
-
-    ExternalTableProcess(TableRuntime tableRuntime) {
-      super(
-          tableRuntime,
-          new DefaultTableProcessStore(
-              tableRuntime, new TableProcessMeta(), 
PeriodicExternalScheduler.this.getAction()));
-    }
-
-    ExternalTableProcess(TableRuntime tableRuntime, TableProcessState state) {
-      super(
-          tableRuntime,
-          new DefaultTableProcessStore(
-              tableRuntime,
-              TableProcessMeta.fromTableProcessState(state),
-              PeriodicExternalScheduler.this.getAction()));
-    }
-
-    @Override
-    protected void closeInternal() {}
-  }
-
-  private class ExternalProcessFactory implements ProcessFactory {
-
-    @Override
-    public AmoroProcess create(TableRuntime tableRuntime, Action action) {
-      return new ExternalTableProcess(tableRuntime);
-    }
-
-    @Override
-    public AmoroProcess recover(TableRuntime tableRuntime, TableProcessState 
state) {
-      return new ExternalTableProcess(tableRuntime, state);
-    }
-  }
-
-  protected class ManagedTableProcess implements ManagedProcess {
-
-    private final AmoroProcess process;
-
-    ManagedTableProcess(AmoroProcess process) {
-      this.process = process;
-    }
-
-    @Override
-    public void submit() {
-      Resource resource = resourceContainer.submit(this);
-      if (resource == null) {
-        throw new IllegalStateException("Submit table process can not return 
null resource");
-      }
-      persistTableProcess(this);
-      resourceManager.createResource(resource);
-      getCompleteFuture()
-          .whenCompleted(
-              () -> {
-                resourceManager.deleteResource(resource.getResourceId());
-                if (store().getStatus() == ProcessStatus.FAILED
-                    && store().getRetryNumber() < getMaxRetryNumber()) {
-                  retry();
-                }
-              });
-      
store().begin().updateTableProcessStatus(ProcessStatus.SUBMITTED).commit();
-      getSubmitFuture().complete();
-    }
-
-    @Override
-    public void complete() {
-      store()
-          .begin()
-          .updateTableProcessStatus(ProcessStatus.SUCCESS)
-          .updateFinishTime(System.currentTimeMillis())
-          .commit();
-      process.getCompleteFuture().complete();
-    }
-
-    @Override
-    public void complete(String failedReason) {
-      store()
-          .begin()
-          .updateTableProcessStatus(ProcessStatus.FAILED)
-          .updateTableProcessFailMessage(failedReason)
-          .updateFinishTime(System.currentTimeMillis())
-          .commit();
-      process.getCompleteFuture().complete();
-    }
-
-    @Override
-    public void retry() {
-      store()
-          .begin()
-          .updateTableProcessStatus(ProcessStatus.PENDING)
-          .updateRetryNumber(store().getRetryNumber() + 1)
-          .updateExternalProcessIdentifier("")
-          .commit();
-      submit();
-    }
-
-    @Override
-    public void kill() {
-      store()
-          .begin()
-          .updateTableProcessStatus(ProcessStatus.KILLED)
-          .updateFinishTime(System.currentTimeMillis())
-          .commit();
-      process.getCompleteFuture().complete();
-    }
-
-    @Override
-    public SimpleFuture getSubmitFuture() {
-      return process.getSubmitFuture();
-    }
-
-    @Override
-    public SimpleFuture getCompleteFuture() {
-      return process.getCompleteFuture();
-    }
-
-    @Override
-    public TableProcessStore store() {
-      return process.store();
-    }
-  }
-}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
index e07cd4432..05e17adc0 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
@@ -18,17 +18,26 @@
 
 package org.apache.amoro.server.table;
 
+import org.apache.amoro.Action;
 import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.SupportsProcessPlugins;
-import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
 import org.apache.amoro.table.TableRuntimeStore;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
 public abstract class AbstractTableRuntime extends PersistentBase
-    implements TableRuntime, SupportsProcessPlugins {
+    implements SupportProcessManagement {
 
   private final TableRuntimeStore store;
+  private final Map<Action, TableProcessContainer> processContainerMap = 
Maps.newConcurrentMap();
 
   protected AbstractTableRuntime(TableRuntimeStore store) {
     this.store = store;
@@ -48,6 +57,49 @@ public abstract class AbstractTableRuntime extends 
PersistentBase
     return TableConfigurations.parseTableConfig(store().getTableConfig());
   }
 
+  @Override
+  public Map<String, String> getTableConfig() {
+    return store().getTableConfig();
+  }
+
+  @Override
+  public List<TableProcessStore> getProcessStates() {
+    return processContainerMap.values().stream()
+        .flatMap(container -> container.getProcessStates().stream())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<TableProcessStore> getProcessStates(Action action) {
+    return processContainerMap.get(action).getProcessStates();
+  }
+
+  @Override
+  public void registerProcess(TableProcessStore processStore) {
+    processContainerMap
+        .computeIfAbsent(processStore.getAction(), k -> new 
TableProcessContainer())
+        .processLock
+        .lock();
+    try {
+      processContainerMap
+          .get(processStore.getAction())
+          .processMap
+          .put(processStore.getProcessId(), processStore);
+    } finally {
+      processContainerMap.get(processStore.getAction()).processLock.unlock();
+    }
+  }
+
+  @Override
+  public void removeProcess(TableProcessStore processStore) {
+    processContainerMap.computeIfPresent(
+        processStore.getAction(),
+        (action, container) -> {
+          container.processMap.remove(processStore.getProcessId());
+          return container;
+        });
+  }
+
   @Override
   public String getGroupName() {
     return store().getGroupName();
@@ -61,4 +113,13 @@ public abstract class AbstractTableRuntime extends 
PersistentBase
   public void dispose() {
     store().dispose();
   }
+
+  private static class TableProcessContainer {
+    private final Lock processLock = new ReentrantLock();
+    private final Map<Long, TableProcessStore> processMap = 
Maps.newConcurrentMap();
+
+    public List<TableProcessStore> getProcessStates() {
+      return Lists.newArrayList(processMap.values());
+    }
+  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 11c1a21cf..8a337451c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -18,10 +18,7 @@
 
 package org.apache.amoro.server.table;
 
-import org.apache.amoro.Action;
 import org.apache.amoro.AmoroTable;
-import org.apache.amoro.SupportsProcessPlugins;
-import org.apache.amoro.TableRuntime;
 import org.apache.amoro.api.BlockableOperation;
 import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.config.TableConfiguration;
@@ -30,10 +27,7 @@ import org.apache.amoro.metrics.MetricRegistry;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.TableRuntimeOptimizingState;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
-import org.apache.amoro.process.AmoroProcess;
-import org.apache.amoro.process.ProcessFactory;
 import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.AmoroServiceConstants;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
@@ -48,7 +42,6 @@ import 
org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
 import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
-import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
 import org.apache.amoro.table.BaseTable;
 import org.apache.amoro.table.ChangeTable;
 import org.apache.amoro.table.MixedTable;
@@ -64,13 +57,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 /** Default table runtime implementation. */
-public class DefaultTableRuntime extends AbstractTableRuntime
-    implements TableRuntime, SupportsProcessPlugins {
+public class DefaultTableRuntime extends AbstractTableRuntime {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultTableRuntime.class);
 
@@ -95,8 +84,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime
   public static final List<StateKey<?>> REQUIRED_STATES =
       Lists.newArrayList(
           OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, 
CLEANUP_STATE_KEY);
-
-  private final Map<Action, TableProcessContainer> processContainerMap = 
Maps.newConcurrentMap();
   private final TableOptimizingMetrics optimizingMetrics;
   private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
   private final TableSummaryMetrics tableSummaryMetrics;
@@ -132,39 +119,6 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime
     this.tableSummaryMetrics.register(metricRegistry);
   }
 
-  @Override
-  public AmoroProcess trigger(Action action) {
-    return Optional.ofNullable(processContainerMap.get(action))
-        .map(container -> container.trigger(action))
-        // Define a related exception
-        .orElseThrow(() -> new IllegalArgumentException("No ProcessFactory for 
action " + action));
-  }
-
-  @Override
-  public void install(Action action, ProcessFactory processFactory) {
-    if (processContainerMap.putIfAbsent(action, new 
TableProcessContainer(processFactory))
-        != null) {
-      throw new IllegalStateException("ProcessFactory for action " + action + 
" already exists");
-    }
-  }
-
-  @Override
-  public boolean enabled(Action action) {
-    return processContainerMap.get(action) != null;
-  }
-
-  @Override
-  public List<TableProcessStore> getProcessStates() {
-    return processContainerMap.values().stream()
-        .flatMap(container -> container.getProcessStates().stream())
-        .collect(Collectors.toList());
-  }
-
-  @Override
-  public List<TableProcessStore> getProcessStates(Action action) {
-    return processContainerMap.get(action).getProcessStates();
-  }
-
   public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() {
     return orphanFilesCleaningMetrics;
   }
@@ -586,30 +540,4 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime
 
     return currentSnapshotId;
   }
-
-  private class TableProcessContainer {
-    private final Lock processLock = new ReentrantLock();
-    private final ProcessFactory processFactory;
-    private final Map<Long, AmoroProcess> processMap = Maps.newConcurrentMap();
-
-    TableProcessContainer(ProcessFactory processFactory) {
-      this.processFactory = processFactory;
-    }
-
-    public AmoroProcess trigger(Action action) {
-      processLock.lock();
-      try {
-        AmoroProcess process = processFactory.create(DefaultTableRuntime.this, 
action);
-        process.getCompleteFuture().whenCompleted(() -> 
processMap.remove(process.getId()));
-        processMap.put(process.getId(), process);
-        return process;
-      } finally {
-        processLock.unlock();
-      }
-    }
-
-    public List<TableProcessStore> getProcessStates() {
-      return 
processMap.values().stream().map(AmoroProcess::store).collect(Collectors.toList());
-    }
-  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
index 86e748cc5..41c8041f2 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
@@ -21,6 +21,9 @@ package org.apache.amoro.server.table;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.table.StateKey;
 import org.apache.amoro.table.TableRuntimeFactory;
 import org.apache.amoro.table.TableRuntimeStore;
@@ -41,6 +44,14 @@ public class DefaultTableRuntimeFactory implements 
TableRuntimeFactory {
     return "default";
   }
 
+  @Override
+  public List<ActionCoordinator> supportedCoordinators() {
+    return Lists.newArrayList();
+  }
+
+  @Override
+  public void initialize(List<ProcessFactory> factories) {}
+
   @Override
   public Optional<TableRuntimeCreator> accept(
       ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties) {
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java
similarity index 60%
copy from 
amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
copy to 
amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java
index 7654af710..b6ff971b8 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java
@@ -16,25 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.table;
+package org.apache.amoro.server.table;
 
-import org.apache.amoro.ActivePlugin;
-import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.TableProcessStore;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+/** Interface for AMS inner used. */
+public interface SupportProcessManagement extends TableRuntime {
 
-/** Table runtime factory. */
-public interface TableRuntimeFactory extends ActivePlugin {
+  /**
+   * Register a process store to the table runtime.
+   *
+   * @param processStore the process store to register
+   */
+  void registerProcess(TableProcessStore processStore);
 
-  Optional<TableRuntimeCreator> accept(
-      ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties);
-
-  interface TableRuntimeCreator {
-    List<StateKey<?>> requiredStateKeys();
-
-    TableRuntime create(TableRuntimeStore store);
-  }
+  /**
+   * Remove a process store from the table runtime.
+   *
+   * @param processStore the process store to remove
+   */
+  void removeProcess(TableProcessStore processStore);
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
index da3617a14..9d0f89b6b 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
@@ -21,18 +21,20 @@ package org.apache.amoro.server.process;
 import org.apache.amoro.Action;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
 import org.apache.amoro.process.TableProcess;
 import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /** Mock implementation of {@link ActionCoordinator} used in tests. */
 public class MockActionCoordinator implements ActionCoordinator {
   public static final int PROCESS_MAX_POOL_SIZE = 1000;
-  private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] 
{TableFormat.PAIMON};
-
-  public static final Action DEFAULT_ACTION = new Action(DEFAULT_FORMATS, 0, 
"default_action");
+  public static final Action DEFAULT_ACTION = 
Action.register("default_action");
+  public static final SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new 
SnowflakeIdGenerator();
 
   /**
    * Whether the format is supported.
@@ -61,12 +63,6 @@ public class MockActionCoordinator implements 
ActionCoordinator {
     return DEFAULT_ACTION;
   }
 
-  /** Get execution engine name. */
-  @Override
-  public String executionEngine() {
-    return "default";
-  }
-
   /** Next executing time. */
   @Override
   public long getNextExecutingTime(TableRuntime tableRuntime) {
@@ -92,19 +88,19 @@ public class MockActionCoordinator implements 
ActionCoordinator {
    * @return mock process
    */
   @Override
-  public TableProcess createTableProcess(TableRuntime tableRuntime) {
+  public Optional<TableProcess> trigger(TableRuntime tableRuntime) {
     TableProcessMeta tableProcessMeta =
         TableProcessMeta.of(
             SNOWFLAKE_ID_GENERATOR.generateId(),
             tableRuntime.getTableIdentifier().getId(),
             action().getName(),
-            executionEngine(),
+            "default",
             new HashMap<>());
     TableProcessStore tableProcessStore =
         new DefaultTableProcessStore(
             tableProcessMeta.getProcessId(), tableRuntime, tableProcessMeta, 
action(), 3);
     MockTableProcess mockTableProcess = new MockTableProcess(tableRuntime, 
tableProcessStore);
-    return mockTableProcess;
+    return Optional.of(mockTableProcess);
   }
 
   /**
@@ -120,18 +116,6 @@ public class MockActionCoordinator implements 
ActionCoordinator {
     return new MockTableProcess(tableRuntime, processStore);
   }
 
-  /** Return same process to cancel. */
-  @Override
-  public TableProcess cancelTableProcess(TableRuntime tableRuntime, 
TableProcess process) {
-    return process;
-  }
-
-  /** Return same process to retry. */
-  @Override
-  public TableProcess retryTableProcess(TableProcess process) {
-    return process;
-  }
-
   /** Open plugin. */
   @Override
   public void open(Map<String, String> properties) {}
diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java 
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 42671a6e1..1d5f81b3b 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -20,38 +20,33 @@ package org.apache.amoro;
 
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 
-import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 public final class Action {
 
   private static final int MAX_NAME_LENGTH = 16;
-
-  /** supported table formats of this action */
-  private final TableFormat[] formats;
+  private static final Map<String, Action> registeredActions = new 
ConcurrentHashMap<>();
 
   private final String name;
-  /**
-   * the weight number of this action, the bigger the weight number, the 
higher positions of
-   * schedulers or front pages
-   */
-  private final int weight;
 
-  public Action(TableFormat[] formats, int weight, String name) {
-    Preconditions.checkArgument(
-        name.length() <= MAX_NAME_LENGTH,
-        "Action name length should be less than " + MAX_NAME_LENGTH);
-    this.formats = formats;
-    this.name = name;
-    this.weight = weight;
+  public static Action register(String name) {
+    final String regularName = name.trim().toUpperCase(Locale.ROOT);
+    return registeredActions.computeIfAbsent(regularName, s -> new 
Action(regularName));
   }
 
-  public int getWeight() {
-    return weight;
+  public static Action valueOf(String name) {
+    final String regularName = name.trim().toUpperCase(Locale.ROOT);
+    return registeredActions.get(regularName);
   }
 
-  public TableFormat[] supportedFormats() {
-    return formats;
+  private Action(String name) {
+    Preconditions.checkArgument(
+        name.length() <= MAX_NAME_LENGTH,
+        "Action name length should be less than " + MAX_NAME_LENGTH);
+    this.name = name;
   }
 
   public String getName() {
@@ -67,13 +62,11 @@ public final class Action {
       return false;
     }
     Action action = (Action) o;
-    return Objects.equals(name, action.name) && Arrays.equals(formats, 
action.formats);
+    return Objects.equals(name, action.name);
   }
 
   @Override
   public int hashCode() {
-    int result = Objects.hash(name);
-    result = 31 * result + Arrays.hashCode(formats);
-    return result;
+    return Objects.hash(name);
   }
 }
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java 
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index 76f470d98..c75c5ac8d 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -23,9 +23,9 @@ public class IcebergActions {
   private static final TableFormat[] DEFAULT_FORMATS =
       new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, 
TableFormat.MIXED_HIVE};
 
-  public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system");
-  public static final Action REWRITE = new Action(DEFAULT_FORMATS, 10, 
"rewrite");
-  public static final Action DELETE_ORPHANS = new Action(DEFAULT_FORMATS, 2, 
"delete-orphans");
-  public static final Action SYNC_HIVE = new Action(DEFAULT_FORMATS, 3, 
"sync-hive");
-  public static final Action EXPIRE_DATA = new Action(DEFAULT_FORMATS, 1, 
"expire-data");
+  public static final Action SYSTEM = Action.register("system");
+  public static final Action REWRITE = Action.register("rewrite");
+  public static final Action DELETE_ORPHANS = 
Action.register("delete-orphans");
+  public static final Action SYNC_HIVE = Action.register("sync-hive");
+  public static final Action EXPIRE_DATA = Action.register("expire-data");
 }
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java 
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index 31ea74f1f..ae2f610cc 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -24,6 +24,7 @@ import org.apache.amoro.process.ProcessFactory;
 import org.apache.amoro.process.TableProcessStore;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * TableRuntime is the key interface for the AMS framework to interact with 
the table. Typically, it
@@ -59,12 +60,20 @@ public interface TableRuntime {
   ServerTableIdentifier getTableIdentifier();
 
   /**
-   * Get the table configuration.
+   * Get the table configuration. @Deprecated use {@link #getTableConfig()} 
instead.
    *
    * @return the table configuration
    */
+  @Deprecated
   TableConfiguration getTableConfiguration();
 
+  /**
+   * Get the table configuration.
+   *
+   * @return the table configuration
+   */
+  Map<String, String> getTableConfig();
+
   /**
    * Register the metric of the table runtime.
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java
 b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java
similarity index 72%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java
rename to 
amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java
index 5e55154cb..1f00a1c4f 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java
@@ -16,15 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.process;
+package org.apache.amoro.process;
 
 import org.apache.amoro.Action;
 import org.apache.amoro.ActivePlugin;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
-import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessStore;
-import org.apache.amoro.server.utils.SnowflakeIdGenerator;
+
+import java.util.Optional;
 
 /**
  * Coordinator for a specific {@link org.apache.amoro.Action} to manage table 
processes. Provides
@@ -32,10 +31,6 @@ import org.apache.amoro.server.utils.SnowflakeIdGenerator;
  */
 public interface ActionCoordinator extends ActivePlugin {
 
-  String PROPERTY_PARALLELISM = "parallelism";
-
-  SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator();
-
   /**
    * Check whether the given table format is supported by this coordinator.
    *
@@ -58,13 +53,6 @@ public interface ActionCoordinator extends ActivePlugin {
    */
   Action action();
 
-  /**
-   * Get the execution engine name used by this coordinator.
-   *
-   * @return execution engine name
-   */
-  String executionEngine();
-
   /**
    * Calculate the next executing time for the given table runtime.
    *
@@ -94,7 +82,7 @@ public interface ActionCoordinator extends ActivePlugin {
    * @param tableRuntime table runtime
    * @return a new table process
    */
-  TableProcess createTableProcess(TableRuntime tableRuntime);
+  Optional<TableProcess> trigger(TableRuntime tableRuntime);
 
   /**
    * Recover a {@link TableProcess} from persisted store.
@@ -104,21 +92,4 @@ public interface ActionCoordinator extends ActivePlugin {
    * @return recovered table process
    */
   TableProcess recoverTableProcess(TableRuntime tableRuntime, 
TableProcessStore processStore);
-
-  /**
-   * Prepare a {@link TableProcess} for cancellation.
-   *
-   * @param tableRuntime table runtime
-   * @param process table process to cancel
-   * @return the process instance to be canceled
-   */
-  TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess 
process);
-
-  /**
-   * Prepare a {@link TableProcess} for retrying.
-   *
-   * @param process table process to retry
-   * @return the process instance to be retried
-   */
-  TableProcess retryTableProcess(TableProcess process);
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java 
b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
index cf0ffd50b..6ff5b6cfb 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
@@ -47,7 +47,7 @@ public interface AmoroProcess {
   SimpleFuture getCompleteFuture();
 
   /**
-   * Get {@link ProcessState} of the process
+   * Get {@link TableProcessStore} of the process
    *
    * @return the state of the process
    */
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java 
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
deleted file mode 100644
index c4a900dca..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-import org.apache.amoro.Action;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.StateField;
-
-/** The state of the optimizing process. */
-public abstract class OptimizingState extends TableProcessState {
-
-  @StateField private volatile long targetSnapshotId;
-  @StateField private volatile long watermark;
-  @StateField private volatile ProcessStage stage;
-  @StateField private volatile long currentStageStartTime;
-
-  public OptimizingState(Action action, ServerTableIdentifier tableIdentifier) 
{
-    super(action, tableIdentifier);
-  }
-
-  public OptimizingState(long id, Action action, ServerTableIdentifier 
tableIdentifier) {
-    super(id, action, tableIdentifier);
-  }
-
-  protected void setStage(ProcessStage stage) {
-    this.stage = stage;
-    this.currentStageStartTime = System.currentTimeMillis();
-  }
-
-  protected void setStage(ProcessStage stage, long stageStartTime) {
-    this.stage = stage;
-    this.currentStageStartTime = stageStartTime;
-  }
-
-  protected void setTargetSnapshotId(long targetSnapshotId) {
-    this.targetSnapshotId = targetSnapshotId;
-  }
-
-  protected void setWatermark(long watermark) {
-    this.watermark = watermark;
-  }
-
-  public long getWatermark() {
-    return watermark;
-  }
-
-  @Override
-  public ProcessStage getStage() {
-    return stage;
-  }
-
-  public long getTargetSnapshotId() {
-    return targetSnapshotId;
-  }
-
-  public long getCurrentStageStartTime() {
-    return currentStageStartTime;
-  }
-
-  @Override
-  public String getName() {
-    return stage.getDesc();
-  }
-}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java 
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
index a2df0a99f..c2e5c838a 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
@@ -19,30 +19,53 @@
 package org.apache.amoro.process;
 
 import org.apache.amoro.Action;
+import org.apache.amoro.ActivePlugin;
+import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.StateKey;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * A factory to create a process. Normally, There will be default 
ProcessFactories for each action
  * and used by default scheduler. Meanwhile, user could extend external 
ProcessFactory to run jobs
  * on external resources like Yarn.
  */
-public interface ProcessFactory {
+public interface ProcessFactory extends ActivePlugin {
+
+  default List<StateKey<?>> requiredStates() {
+    return Lists.newArrayList();
+  }
+
+  /** Get supported actions for each table format. */
+  Map<TableFormat, Set<Action>> supportedActions();
+
+  /** How to trigger a process for the action. */
+  default ProcessTriggerStrategy triggerStrategy(TableFormat format, Action 
action) {
+    return ProcessTriggerStrategy.METADATA_TRIGGER;
+  }
 
   /**
-   * Create a process for the action.
+   * Try trigger a process for the action.
    *
    * @param tableRuntime table runtime
    * @param action action type
    * @return target process which has not been submitted yet.
    */
-  AmoroProcess create(TableRuntime tableRuntime, Action action);
+  Optional<TableProcess> trigger(TableRuntime tableRuntime, Action action);
 
   /**
    * Recover a process for the action from a state.
    *
    * @param tableRuntime table runtime
-   * @param state state of the process
+   * @param store storage of the process
    * @return target process which has not been submitted yet.
+   * @throws RecoverProcessFailedException if recover failed
    */
-  AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state);
+  TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
+      throws RecoverProcessFailedException;
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java 
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java
deleted file mode 100644
index 91f76fcae..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-import org.apache.amoro.Action;
-
-import java.util.Map;
-
-/**
- * ProcessState contains information in any {@link AmoroProcess} which must be 
persistent and {@link
- * ProcessFactory} will use to recover {@link AmoroProcess}.
- */
-public interface ProcessState {
-
-  /** @return unique identifier of the process. */
-  long getId();
-
-  /**
-   * @return the name of the state. If multiple stages are involved, it should 
be the name of the
-   *     current stage.
-   */
-  String getName();
-
-  /** @return start time of the process. */
-  long getStartTime();
-
-  /** @return the action of the process. */
-  Action getAction();
-
-  /** @return the status of the process. */
-  ProcessStatus getStatus();
-
-  /**
-   * Get the string encoded summary of the process, this could be a simple 
description or a POJO
-   * encoded by JSON
-   *
-   * @return the summary of the process
-   */
-  Map<String, String> getSummary();
-
-  /** @return the reason of process failure, null if the process has not 
failed yet. */
-  String getFailedReason();
-
-  /**
-   * Total millisecond running time of all tasks in the process.
-   *
-   * @return actual quota runtime of the process.
-   */
-  long getQuotaRuntime();
-
-  /**
-   * Quota value is calculated by the total millisecond running time of all 
tasks in the process
-   * divided by the total millisecond from the start time to the current time. 
It is used to
-   * evaluate the actual runtime concurrence of the process.
-   *
-   * @return the quota value of the process.
-   */
-  default double getQuotaValue() {
-    return (double) getQuotaRuntime() / (System.currentTimeMillis() - 
getStartTime());
-  }
-}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java
 
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java
new file mode 100644
index 000000000..43bf5aed8
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import java.time.Duration;
+
+/** Process trigger strategy. */
+public final class ProcessTriggerStrategy {
+
+  public static final ProcessTriggerStrategy METADATA_TRIGGER =
+      new ProcessTriggerStrategy(Duration.ofDays(1), true, 1);
+
+  private final Duration triggerInterval;
+
+  private final boolean triggerOnNewSnapshot;
+
+  private final int triggerParallelism;
+
+  public ProcessTriggerStrategy(
+      Duration triggerInterval, boolean triggerOnNewSnapshot, int 
triggerParallelism) {
+    this.triggerInterval = triggerInterval;
+    this.triggerOnNewSnapshot = triggerOnNewSnapshot;
+    this.triggerParallelism = triggerParallelism;
+  }
+
+  public static ProcessTriggerStrategy triggerAtFixRate(Duration 
triggerInterval) {
+    return new ProcessTriggerStrategy(triggerInterval, false, 1);
+  }
+
+  public Duration getTriggerInterval() {
+    return triggerInterval;
+  }
+
+  public boolean isTriggerOnNewSnapshot() {
+    return triggerOnNewSnapshot;
+  }
+
+  public int getTriggerParallelism() {
+    return triggerParallelism;
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java 
b/amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java
similarity index 58%
copy from 
amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
copy to 
amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java
index 7654af710..5e9c0c64d 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ 
b/amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java
@@ -16,25 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.table;
+package org.apache.amoro.process;
 
-import org.apache.amoro.ActivePlugin;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.TableRuntime;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Table runtime factory. */
-public interface TableRuntimeFactory extends ActivePlugin {
-
-  Optional<TableRuntimeCreator> accept(
-      ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties);
-
-  interface TableRuntimeCreator {
-    List<StateKey<?>> requiredStateKeys();
-
-    TableRuntime create(TableRuntimeStore store);
+/** Exception thrown when a process recovery fails. */
+public class RecoverProcessFailedException extends RuntimeException {
+  public RecoverProcessFailedException(String message) {
+    super(message);
   }
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java 
b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
deleted file mode 100644
index c4e800ed1..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-import org.apache.amoro.Action;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.StateField;
-
-import java.util.Map;
-
-/** A common state of a table process. */
-public class TableProcessState implements ProcessState {
-
-  @StateField private volatile long id;
-  @StateField private volatile String externalProcessIdentifier;
-  private final Action action;
-  private final ServerTableIdentifier tableIdentifier;
-  private String executionEngine;
-  @StateField private int retryNumber = 0;
-  @StateField private long startTime = -1L;
-  @StateField private long endTime = -1L;
-  @StateField private ProcessStatus status = ProcessStatus.PENDING;
-  @StateField private volatile String failedReason;
-  private volatile Map<String, String> processParameters;
-  private volatile Map<String, String> summary;
-
-  public TableProcessState(Action action, ServerTableIdentifier 
tableIdentifier) {
-    this.action = action;
-    this.tableIdentifier = tableIdentifier;
-  }
-
-  public TableProcessState(long id, Action action, ServerTableIdentifier 
tableIdentifier) {
-    this.id = id;
-    this.action = action;
-    this.tableIdentifier = tableIdentifier;
-  }
-
-  public TableProcessState(
-      long id, Action action, ServerTableIdentifier tableIdentifier, String 
executionEngine) {
-    this.id = id;
-    this.action = action;
-    this.tableIdentifier = tableIdentifier;
-    this.executionEngine = executionEngine;
-  }
-
-  @Override
-  public long getId() {
-    return id;
-  }
-
-  public String getExternalProcessIdentifier() {
-    return externalProcessIdentifier;
-  }
-
-  public String getName() {
-    return action.getName();
-  }
-
-  public Action getAction() {
-    return action;
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public long getEndTime() {
-    return endTime;
-  }
-
-  public ProcessStatus getStatus() {
-    return status;
-  }
-
-  public String getExecutionEngine() {
-    return executionEngine;
-  }
-
-  @Override
-  public Map<String, String> getSummary() {
-    return summary;
-  }
-
-  @Override
-  public long getQuotaRuntime() {
-    return getDuration();
-  }
-
-  @Override
-  public double getQuotaValue() {
-    return 1;
-  }
-
-  public long getDuration() {
-    return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - 
startTime;
-  }
-
-  public ServerTableIdentifier getTableIdentifier() {
-    return tableIdentifier;
-  }
-
-  public void setExternalProcessIdentifier(String externalProcessIdentifier) {
-    this.externalProcessIdentifier = externalProcessIdentifier;
-  }
-
-  public void setExecutionEngine(String executionEngine) {
-    this.executionEngine = executionEngine;
-  }
-
-  protected void setSummary(Map<String, String> summary) {
-    this.summary = summary;
-  }
-
-  protected void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  public void setStatus(ProcessStatus status) {
-    if (status == ProcessStatus.SUCCESS
-        || status == ProcessStatus.FAILED
-        || status == ProcessStatus.KILLED) {
-      endTime = System.currentTimeMillis();
-    } else if (this.status != ProcessStatus.SUBMITTED && status == 
ProcessStatus.SUBMITTED) {
-      endTime = -1L;
-      failedReason = null;
-      summary = null;
-    }
-    this.status = status;
-  }
-
-  public String getFailedReason() {
-    return failedReason;
-  }
-
-  public ProcessStage getStage() {
-    return status.toStage();
-  }
-
-  protected void setId(long processId) {
-    this.id = processId;
-  }
-
-  public Map<String, String> getProcessParameters() {
-    return processParameters;
-  }
-
-  public void setProcessParameters(Map<String, String> processParameters) {
-    this.processParameters = processParameters;
-  }
-
-  public void setSubmitted(String externalProcessIdentifier) {
-    this.status = ProcessStatus.SUBMITTED;
-    setExternalProcessIdentifier(externalProcessIdentifier);
-    this.startTime = System.currentTimeMillis();
-  }
-
-  public void setSubmitted() {
-    this.status = ProcessStatus.SUBMITTED;
-    this.startTime = System.currentTimeMillis();
-  }
-
-  public void setRunning() {
-    this.status = ProcessStatus.RUNNING;
-  }
-
-  public void setCanceling() {
-    this.status = ProcessStatus.CANCELING;
-  }
-
-  public void addRetryNumber() {
-    this.retryNumber += 1;
-    this.status = ProcessStatus.PENDING;
-    this.externalProcessIdentifier = "";
-    this.failedReason = null;
-  }
-
-  public void resetRetryNumber() {
-    this.retryNumber = 0;
-  }
-
-  public void setCanceled() {
-    this.status = ProcessStatus.CANCELED;
-  }
-
-  public void setCompleted() {
-    this.status = ProcessStatus.SUCCESS;
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void setKilled() {
-    this.status = ProcessStatus.KILLED;
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void setCompleted(String failedReason) {
-    this.status = ProcessStatus.FAILED;
-    this.failedReason = failedReason;
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public int getRetryNumber() {
-    return retryNumber;
-  }
-
-  public void setRetryNumber(int retryNumber) {
-    this.retryNumber = retryNumber;
-  }
-}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java 
b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
index 7654af710..90e26bd4b 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
@@ -21,6 +21,8 @@ package org.apache.amoro.table;
 import org.apache.amoro.ActivePlugin;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
 
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,10 @@ import java.util.Optional;
 /** Table runtime factory. */
 public interface TableRuntimeFactory extends ActivePlugin {
 
+  List<ActionCoordinator> supportedCoordinators();
+
+  void initialize(List<ProcessFactory> factories);
+
   Optional<TableRuntimeCreator> accept(
       ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties);
 

Reply via email to