This is an automated email from the ASF dual-hosted git repository.

baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba51cda2 [Improvement]: Load process factories via 
DefaultTableRuntimeFactory (#4100)
7ba51cda2 is described below

commit 7ba51cda2bd9efa0216c915a98429cdce45642b1
Author: baiyangtx <[email protected]>
AuthorDate: Thu Mar 5 17:32:12 2026 +0800

    [Improvement]: Load process factories via DefaultTableRuntimeFactory (#4100)
    
    * feat(ams): load process factories via DefaultTableRuntimeFactory
    
    ## Summary
    
    Wire the new process plugin model into AMS so that table processes are 
discovered from `ProcessFactory` implementations and scheduled via the existing 
`ProcessService`.
    
    ## Details
    
    - Extend `DefaultTableRuntimeFactory` to implement the 
`TableRuntimeFactory` process APIs:
      - Aggregate installed `ProcessFactory` instances by `TableFormat` / 
`Action` and expose derived `ActionCoordinator` plugins via 
`supportedCoordinators()`
      - Merge `DefaultTableRuntime.REQUIRED_STATES` with additional states 
required by all process factories for the same format when building 
`TableRuntimeCreator`
      - Keep using `DefaultTableRuntime` without introducing extra runtime types
    - Introduce `DefaultActionCoordinator` to bridge `ProcessFactory` 
trigger/recover semantics to the existing scheduler:
      - Build trigger strategies from `ProcessFactory#triggerStrategy`
      - Delegate `trigger` and `recoverTableProcess` to the underlying factory
    - Add `TableProcessFactoryManager` as an 
`AbstractPluginManager<ProcessFactory>` using the `process-factories` plugin 
namespace
    - Refactor `AmoroServiceContainer` startup sequence:
      - Initialize `TableProcessFactoryManager` and collect all installed 
`ProcessFactory` instances
      - Initialize all `TableRuntimeFactory` plugins with the shared list of 
process factories
      - Collect all derived `ActionCoordinator`s from table runtime factories 
and inject them into `ProcessService`
    - Update `ProcessService` to accept a pre-built list of 
`ActionCoordinator`s while keeping the original constructors for backward 
compatibility and tests
    - Run `mvn spotless:apply` and `mvn -pl amoro-ams -am -DskipTests compile` 
to ensure style and compilation pass
    
    Co-Authored-By: Aime <[email protected]>
    Change-Id: Iaa867503c8b0bf93c2b7f0b8fe7d752e2ddbac67
    
    * refactor(table-runtime): decouple TableRuntimeFactory from ActivePlugin 
and use default runtime
    
    ## Summary
    
    Decouple `TableRuntimeFactory` from the generic `ActivePlugin` mechanism 
and make `AmoroServiceContainer` explicitly use `DefaultTableRuntimeFactory` as 
the default implementation.
    
    ## Details
    
    - Change `TableRuntimeFactory` in `amoro-common` so it no longer extends 
`ActivePlugin`, keeping only process-related APIs:
      - `List<ActionCoordinator> supportedCoordinators()`
      - `void initialize(List<ProcessFactory> factories)`
      - `Optional<TableRuntimeCreator> accept(ServerTableIdentifier, 
Map<String, String>)`
    - Refactor `TableRuntimeFactoryManager` in AMS:
      - Remove inheritance from `AbstractPluginManager<TableRuntimeFactory>` to 
avoid the `ActivePlugin` constraint
      - Implement a simple manager that wraps a provided 
`List<TableRuntimeFactory>`
      - Provide a default no-arg constructor that installs a single 
`DefaultTableRuntimeFactory`
      - Keep `initialize()` as a no-op and `installedPlugins()` as the accessor 
to preserve existing wiring in `DefaultTableService`
    - Update `DefaultTableRuntimeFactory`:
      - Remove `@Override` annotations from `open/close/name` since they no 
longer implement `ActivePlugin` methods
      - Keep the methods as no-op helpers with the same behavior
    - Update `AmoroServiceContainer.startOptimizingService` to use 
`DefaultTableRuntimeFactory` directly:
      - Construct a `DefaultTableRuntimeFactory` instance explicitly
      - Wrap it into `TableRuntimeFactoryManager` via 
`Collections.singletonList(tableRuntimeFactory)`
      - Use the resulting singleton list as the only table runtime factory when 
initializing process factories and collecting `ActionCoordinator`s
    - Leave `DefaultTableService` logic unchanged, it still uses 
`TableRuntimeFactoryManager.installedPlugins()` but now operates over the 
explicit default factory list
    - Ensure `ProcessService` changes from previous step are included in this 
commit so the module compiles cleanly
    
    ## Verification
    
    - Ran `./mvnw -pl amoro-ams -am -DskipTests compile` from repo root
      - Build succeeded for the full AMS reactor
      - `spotless` and `checkstyle` passed with only existing warnings 
unrelated to this change
    
    Co-Authored-By: Aime <[email protected]>
    Change-Id: Ifa8deef0d2553176300cdef2c0cb073d52ee3303
    
    * refactor(table-runtime): remove TableRuntimeFactoryManager and inline 
default factory
    
    ## Summary
    
    - Remove TableRuntimeFactoryManager indirection and wire 
DefaultTableService directly with a TableRuntimeFactory implementation.
    - Simplify DefaultTableRuntimeFactory after decoupling from plugin 
framework and ActivePlugin lifecycle.
    - Inline default table runtime factory wiring into AmoroServiceContainer 
and tests, and update process/service initialization.
    
    ## Details
    
    - DefaultTableService
      - Change constructor to accept a TableRuntimeFactory instead of 
TableRuntimeFactoryManager.
      - Replace iteration over installed factories with a single 
factory.accept(...) call when creating TableRuntime instances.
    
    - DefaultTableRuntimeFactory
      - Drop unused ActivePlugin-style methods: open(Map<String, String>), 
close(), and name().
    
    - AmoroServiceContainer
      - Instantiate a single DefaultTableRuntimeFactory in 
startOptimizingService and pass it into DefaultTableService.
      - Initialize the defaultRuntimeFactory with ProcessFactory plugins and 
derive ActionCoordinators from it directly.
    
    - Tests
      - AMSServiceTestBase: construct DefaultTableService with a concrete 
DefaultTableRuntimeFactory instead of a mocked TableRuntimeFactoryManager.
      - TestDefaultTableRuntimeHandler: hold a DefaultTableRuntimeFactory field 
and pass it into DefaultTableService for all test setups.
    
    - Cleanup
      - Delete TableRuntimeFactoryManager class and remove all references and 
related imports across main and test code.
      - Fix spotless formatting violations in the touched files so that `mvn 
-pl amoro-ams -am -DskipTests compile` passes.
    
    Co-Authored-By: Aime <[email protected]>
    Change-Id: I8acca1841470dfc1bbd87b77e424a34f4d52ae82
    
    * fix comment
    
    * fix comment
    
    * fix comment
    
    * fix comment
    
    ---------
    
    Co-authored-by: zhangyongxiang.alpha <[email protected]>
    Co-authored-by: Aime <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../apache/amoro/server/AmoroServiceContainer.java |  23 ++--
 .../amoro/server/process/ProcessService.java       |  36 ++----
 .../TableProcessFactoryManager.java}               |  18 ++-
 .../server/table/DefaultActionCoordinator.java     | 124 +++++++++++++++++++++
 .../server/table/DefaultTableRuntimeFactory.java   | 112 ++++++++++++++++---
 .../amoro/server/table/DefaultTableService.java    |  35 +++---
 .../apache/amoro/server/AMSServiceTestBase.java    |  14 +--
 .../table/TestDefaultTableRuntimeHandler.java      |  17 +--
 .../apache/amoro/table/TableRuntimeFactory.java    |   3 +-
 9 files changed, 283 insertions(+), 99 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 4fd3e5e9a..6fa3293e9 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -32,6 +32,8 @@ import org.apache.amoro.config.ConfigurationException;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
 import org.apache.amoro.exception.AmoroRuntimeException;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.catalog.DefaultCatalogManager;
 import org.apache.amoro.server.dashboard.DashboardServer;
@@ -47,16 +49,18 @@ import 
org.apache.amoro.server.persistence.DataSourceFactory;
 import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
 import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
 import org.apache.amoro.server.process.ProcessService;
+import org.apache.amoro.server.process.ProcessService.ExecuteEngineManager;
+import org.apache.amoro.server.process.TableProcessFactoryManager;
 import org.apache.amoro.server.resource.ContainerMetadata;
 import org.apache.amoro.server.resource.Containers;
 import org.apache.amoro.server.resource.DefaultOptimizerManager;
 import org.apache.amoro.server.resource.OptimizerManager;
 import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
 import org.apache.amoro.server.table.DefaultTableManager;
+import org.apache.amoro.server.table.DefaultTableRuntimeFactory;
 import org.apache.amoro.server.table.DefaultTableService;
 import org.apache.amoro.server.table.RuntimeHandlerChain;
 import org.apache.amoro.server.table.TableManager;
-import org.apache.amoro.server.table.TableRuntimeFactoryManager;
 import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.server.terminal.TerminalManager;
 import org.apache.amoro.server.utils.ThriftServiceProxy;
@@ -229,17 +233,22 @@ public class AmoroServiceContainer {
   }
 
   public void startOptimizingService() throws Exception {
-    TableRuntimeFactoryManager tableRuntimeFactoryManager = new 
TableRuntimeFactoryManager();
-    tableRuntimeFactoryManager.initialize();
+    // Load process factories and build action coordinators from default table 
runtime factory.
+    TableProcessFactoryManager tableProcessFactoryManager = new 
TableProcessFactoryManager();
+    tableProcessFactoryManager.initialize();
+    List<ProcessFactory> processFactories = 
tableProcessFactoryManager.installedPlugins();
 
-    tableService =
-        new DefaultTableService(serviceConfig, catalogManager, 
tableRuntimeFactoryManager);
+    DefaultTableRuntimeFactory defaultRuntimeFactory = new 
DefaultTableRuntimeFactory();
+    defaultRuntimeFactory.initialize(processFactories);
 
+    List<ActionCoordinator> actionCoordinators = 
defaultRuntimeFactory.supportedCoordinators();
+    ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
+
+    tableService = new DefaultTableService(serviceConfig, catalogManager, 
defaultRuntimeFactory);
+    processService = new ProcessService(tableService, actionCoordinators, 
executeEngineManager);
     optimizingService =
         new DefaultOptimizingService(serviceConfig, catalogManager, 
optimizerManager, tableService);
 
-    processService = new ProcessService(serviceConfig, tableService);
-
     LOG.info("Setting up AMS table executors...");
     InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
     addHandlerChain(optimizingService.getTableRuntimeHandler());
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index 252d1574b..79881c61b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -23,7 +23,6 @@ import org.apache.amoro.AmoroTable;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.Configurations;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.process.ActionCoordinator;
 import org.apache.amoro.process.ProcessEvent;
@@ -64,8 +63,8 @@ public class ProcessService extends PersistentBase {
       new ConcurrentHashMap<>();
   private final Map<EngineType, ExecuteEngine> executeEngines = new 
ConcurrentHashMap<>();
 
-  private final ActionCoordinatorManager actionCoordinatorManager;
   private final ExecuteEngineManager executeEngineManager;
+  private final List<ActionCoordinator> actionCoordinatorList;
   private final ProcessRuntimeHandler tableRuntimeHandler = new 
ProcessRuntimeHandler();
   private final ThreadPoolExecutor processExecutionPool =
       new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
@@ -73,17 +72,16 @@ public class ProcessService extends PersistentBase {
   private final Map<ServerTableIdentifier, Map<Long, TableProcess>> 
activeTableProcess =
       new ConcurrentHashMap<>();
 
-  public ProcessService(Configurations serviceConfig, TableService 
tableService) {
-    this(serviceConfig, tableService, new ActionCoordinatorManager(), new 
ExecuteEngineManager());
+  public ProcessService(TableService tableService) {
+    this(tableService, Collections.emptyList(), new ExecuteEngineManager());
   }
 
   public ProcessService(
-      Configurations serviceConfig,
       TableService tableService,
-      ActionCoordinatorManager actionCoordinatorManager,
+      List<ActionCoordinator> actionCoordinators,
       ExecuteEngineManager executeEngineManager) {
     this.tableService = tableService;
-    this.actionCoordinatorManager = actionCoordinatorManager;
+    this.actionCoordinatorList = actionCoordinators;
     this.executeEngineManager = executeEngineManager;
   }
 
@@ -148,7 +146,6 @@ public class ProcessService extends PersistentBase {
 
   /** Dispose the service, shutdown engines and clear active processes. */
   public void dispose() {
-    actionCoordinatorManager.close();
     executeEngineManager.close();
     processExecutionPool.shutdown();
     activeTableProcess.clear();
@@ -156,16 +153,12 @@ public class ProcessService extends PersistentBase {
 
   private void initialize(List<TableRuntime> tableRuntimes) {
     LOG.info("Initializing process service");
-    actionCoordinatorManager.initialize();
-    actionCoordinatorManager
-        .installedPlugins()
-        .forEach(
-            actionCoordinator -> {
-              actionCoordinators.put(
-                  actionCoordinator.action().getName(),
-                  new ActionCoordinatorScheduler(
-                      actionCoordinator, tableService, ProcessService.this));
-            });
+    // Pre-configured coordinators built from TableRuntimeFactory / 
ProcessFactory
+    for (ActionCoordinator actionCoordinator : actionCoordinatorList) {
+      actionCoordinators.put(
+          actionCoordinator.action().getName(),
+          new ActionCoordinatorScheduler(actionCoordinator, tableService, 
ProcessService.this));
+    }
     executeEngineManager.initialize();
     executeEngineManager
         .installedPlugins()
@@ -553,13 +546,6 @@ public class ProcessService extends PersistentBase {
     }
   }
 
-  /** Manager for {@link ActionCoordinator} plugins. */
-  public static class ActionCoordinatorManager extends 
AbstractPluginManager<ActionCoordinator> {
-    public ActionCoordinatorManager() {
-      super("action-coordinators");
-    }
-  }
-
   /** Manager for {@link ExecuteEngine} plugins. */
   public static class ExecuteEngineManager extends 
AbstractPluginManager<ExecuteEngine> {
     public ExecuteEngineManager() {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java
old mode 100644
new mode 100755
similarity index 61%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java
rename to 
amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java
index 65a906108..f54290937
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java
@@ -16,15 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.table;
+package org.apache.amoro.server.process;
 
+import org.apache.amoro.process.ProcessFactory;
 import org.apache.amoro.server.manager.AbstractPluginManager;
-import org.apache.amoro.table.TableRuntimeFactory;
 
-public class TableRuntimeFactoryManager extends 
AbstractPluginManager<TableRuntimeFactory> {
-  public static final String PLUGIN_CATEGORY = "table-runtime-factories";
+/**
+ * Plugin manager for {@link ProcessFactory} implementations.
+ *
+ * <p>Process factories are configured via {@code 
plugins/process-factories.yaml} and are
+ * responsible for describing how different {@code TableFormat} / {@code 
Action} combinations should
+ * be scheduled and executed.
+ */
+public class TableProcessFactoryManager extends 
AbstractPluginManager<ProcessFactory> {
 
-  public TableRuntimeFactoryManager() {
-    super(PLUGIN_CATEGORY);
+  public TableProcessFactoryManager() {
+    super("process-factories");
   }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java
new file mode 100755
index 000000000..053e3c1cf
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.table;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
+import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Default implementation of {@link ActionCoordinator} that bridges {@link 
ProcessFactory}
+ * declarations to the AMS scheduling framework.
+ */
+public class DefaultActionCoordinator implements ActionCoordinator {
+
+  private final Action action;
+  private final TableFormat format;
+  private final ProcessFactory factory;
+  private final ProcessTriggerStrategy strategy;
+
+  public DefaultActionCoordinator(TableFormat format, Action action, 
ProcessFactory factory) {
+    this.action = action;
+    this.format = format;
+    this.factory = factory;
+    this.strategy = factory.triggerStrategy(format, action);
+    Preconditions.checkArgument(
+        strategy != null,
+        "ProcessTriggerStrategy cannot be null for format %s, action %s, 
factory %s",
+        format,
+        action,
+        factory.name());
+  }
+
+  @Override
+  public String name() {
+    // No need to be globally unique, this coordinator is not discovered via 
plugin manager.
+    return String.format("%s-%s-coordinator", format.name().toLowerCase(), 
action.getName());
+  }
+
+  @Override
+  public void open(Map<String, String> properties) {
+    // No-op: lifecycle is managed by owning TableRuntimeFactory.
+  }
+
+  @Override
+  public void close() {
+    // No-op: nothing to close.
+  }
+
+  @Override
+  public boolean formatSupported(TableFormat format) {
+    return this.format.equals(format);
+  }
+
+  @Override
+  public int parallelism() {
+    return strategy.getTriggerParallelism();
+  }
+
+  @Override
+  public Action action() {
+    return action;
+  }
+
+  @Override
+  public long getNextExecutingTime(TableRuntime tableRuntime) {
+    // Fixed-rate scheduling based on configured trigger interval.
+    return strategy.getTriggerInterval().toMillis();
+  }
+
+  @Override
+  public boolean enabled(TableRuntime tableRuntime) {
+    return formatSupported(tableRuntime.getFormat());
+  }
+
+  @Override
+  public long getExecutorDelay() {
+    return strategy.getTriggerInterval().toMillis();
+  }
+
+  @Override
+  public Optional<TableProcess> trigger(TableRuntime tableRuntime) {
+    return factory.trigger(tableRuntime, action);
+  }
+
+  @Override
+  public TableProcess recoverTableProcess(
+      TableRuntime tableRuntime, TableProcessStore processStore) {
+    try {
+      return factory.recover(tableRuntime, processStore);
+    } catch (RecoverProcessFailedException e) {
+      throw new IllegalStateException(
+          String.format(
+              "Failed to recover table process for format %s, action %s, table 
%s",
+              format, action, tableRuntime.getTableIdentifier()),
+          e);
+    }
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
index 41c8041f2..83050799a 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.amoro.server.table;
 
+import org.apache.amoro.Action;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
@@ -28,45 +29,122 @@ import org.apache.amoro.table.StateKey;
 import org.apache.amoro.table.TableRuntimeFactory;
 import org.apache.amoro.table.TableRuntimeStore;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
+/**
+ * Default {@link TableRuntimeFactory} implementation used by AMS.
+ *
+ * <p>Besides creating {@link DefaultTableRuntime} instances for mixed/iceberg 
formats, this factory
+ * also aggregates {@link ProcessFactory} declarations to expose {@link 
ActionCoordinator} plugins
+ * for different {@link TableFormat}/{@link Action} combinations.
+ */
 public class DefaultTableRuntimeFactory implements TableRuntimeFactory {
-  @Override
-  public void open(Map<String, String> properties) {}
 
-  @Override
-  public void close() {}
+  /** Mapping from table format to its supported actions and corresponding 
process factory. */
+  private final Map<TableFormat, Map<Action, ProcessFactory>> 
factoriesByFormat = new HashMap<>();
 
-  @Override
-  public String name() {
-    return "default";
-  }
+  /** Coordinators derived from all installed process factories. */
+  private final List<ActionCoordinator> supportedCoordinators = 
Lists.newArrayList();
 
   @Override
   public List<ActionCoordinator> supportedCoordinators() {
-    return Lists.newArrayList();
+    return Collections.unmodifiableList(supportedCoordinators);
   }
 
   @Override
-  public void initialize(List<ProcessFactory> factories) {}
+  public void initialize(List<ProcessFactory> factories) {
+    factoriesByFormat.clear();
+    supportedCoordinators.clear();
+
+    for (ProcessFactory factory : factories) {
+      Map<TableFormat, Set<Action>> supported = factory.supportedActions();
+      if (supported == null || supported.isEmpty()) {
+        continue;
+      }
+
+      for (Map.Entry<TableFormat, Set<Action>> entry : supported.entrySet()) {
+        TableFormat format = entry.getKey();
+        Map<Action, ProcessFactory> byAction =
+            factoriesByFormat.computeIfAbsent(format, k -> new HashMap<>());
+
+        for (Action action : entry.getValue()) {
+          ProcessFactory existed = byAction.get(action);
+          if (existed != null && existed != factory) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "ProcessFactory conflict for format %s and action %s, 
existing: %s, new: %s",
+                    format, action, existed.name(), factory.name()));
+          }
+          byAction.put(action, factory);
+          supportedCoordinators.add(new DefaultActionCoordinator(format, 
action, factory));
+        }
+      }
+    }
+  }
 
   @Override
   public Optional<TableRuntimeCreator> accept(
       ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties) {
-    if (tableIdentifier
-        .getFormat()
-        .in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, 
TableFormat.ICEBERG)) {
-      return Optional.of(new TableRuntimeCreatorImpl());
+    TableFormat format = tableIdentifier.getFormat();
+    boolean defaultSupported =
+        format.in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, 
TableFormat.ICEBERG);
+    boolean hasProcessFactories = factoriesByFormat.containsKey(format);
+
+    if (!defaultSupported && !hasProcessFactories) {
+      return Optional.empty();
     }
-    return Optional.empty();
+
+    return Optional.of(new TableRuntimeCreatorImpl(format));
   }
 
-  private static class TableRuntimeCreatorImpl implements 
TableRuntimeFactory.TableRuntimeCreator {
+  private class TableRuntimeCreatorImpl implements 
TableRuntimeFactory.TableRuntimeCreator {
+
+    private final TableFormat format;
+
+    private TableRuntimeCreatorImpl(TableFormat format) {
+      this.format = format;
+    }
+
     @Override
     public List<StateKey<?>> requiredStateKeys() {
-      return DefaultTableRuntime.REQUIRED_STATES;
+      Map<String, StateKey<?>> merged = new LinkedHashMap<>();
+      // 1) DefaultTableRuntime required states
+      for (StateKey<?> stateKey : DefaultTableRuntime.REQUIRED_STATES) {
+        merged.put(stateKey.getKey(), stateKey);
+      }
+
+      // 2) Extra states from all process factories for this format (if any)
+      Map<Action, ProcessFactory> byAction = factoriesByFormat.get(format);
+      if (byAction != null) {
+        Map<String, ProcessFactory> deFactories = new HashMap<>();
+        byAction.forEach((a, f) -> deFactories.putIfAbsent(f.name(), f));
+        deFactories
+            .values()
+            .forEach(
+                factory ->
+                    factory
+                        .requiredStates()
+                        .forEach(
+                            stateKey -> {
+                              if (merged.containsKey(stateKey.getKey())) {
+                                throw new IllegalStateException(
+                                    "Failed to initialize table runtime 
creator, due to stateKey: "
+                                        + stateKey.getKey()
+                                        + " declared by process-factory: "
+                                        + factory.name()
+                                        + " has already been defined.");
+                              }
+                              merged.put(stateKey.getKey(), stateKey);
+                            }));
+      }
+
+      return Lists.newArrayList(merged.values());
     }
 
     @Override
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 464cd5601..a247b1804 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -44,6 +44,7 @@ import 
org.apache.amoro.shade.guava32.com.google.common.base.Objects;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.amoro.table.TableRuntimeFactory;
 import org.apache.amoro.table.TableSummary;
 import org.apache.amoro.utils.TablePropertyUtil;
 import org.slf4j.Logger;
@@ -85,19 +86,19 @@ public class DefaultTableService extends PersistentBase 
implements TableService
   private final CompletableFuture<Boolean> initialized = new 
CompletableFuture<>();
   private final Configurations serverConfiguration;
   private final CatalogManager catalogManager;
-  private final TableRuntimeFactoryManager tableRuntimeFactoryManager;
+  private final TableRuntimeFactory tableRuntimeFactory;
   private RuntimeHandlerChain headHandler;
   private ExecutorService tableExplorerExecutors;
 
   public DefaultTableService(
       Configurations configuration,
       CatalogManager catalogManager,
-      TableRuntimeFactoryManager tableRuntimeFactoryManager) {
+      TableRuntimeFactory tableRuntimeFactory) {
     this.catalogManager = catalogManager;
     this.externalCatalogRefreshingInterval =
         
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
     this.serverConfiguration = configuration;
-    this.tableRuntimeFactoryManager = tableRuntimeFactoryManager;
+    this.tableRuntimeFactory = tableRuntimeFactory;
   }
 
   @Override
@@ -515,21 +516,19 @@ public class DefaultTableService extends PersistentBase 
implements TableService
       ServerTableIdentifier identifier,
       TableRuntimeMeta runtimeMeta,
       List<TableRuntimeState> restoredStates) {
-    return tableRuntimeFactoryManager.installedPlugins().stream()
-        .map(f -> f.accept(identifier, runtimeMeta.getTableConfig()))
-        .filter(Optional::isPresent)
-        .map(Optional::get)
-        .findFirst()
-        .map(
-            creator -> {
-              DefaultTableRuntimeStore store =
-                  new DefaultTableRuntimeStore(
-                      identifier, runtimeMeta, creator.requiredStateKeys(), 
restoredStates);
-              store.setRuntimeHandler(this);
-              TableRuntime tableRuntime = creator.create(store);
-              store.setTableRuntime(tableRuntime);
-              return tableRuntime;
-            });
+    Optional<TableRuntimeFactory.TableRuntimeCreator> creatorOpt =
+        tableRuntimeFactory.accept(identifier, runtimeMeta.getTableConfig());
+
+    return creatorOpt.map(
+        creator -> {
+          DefaultTableRuntimeStore store =
+              new DefaultTableRuntimeStore(
+                  identifier, runtimeMeta, creator.requiredStateKeys(), 
restoredStates);
+          store.setRuntimeHandler(this);
+          TableRuntime tableRuntime = creator.create(store);
+          store.setTableRuntime(tableRuntime);
+          return tableRuntime;
+        });
   }
 
   private void revertTableRuntimeAdded(
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 9f3d25b3f..f7266ba04 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -26,12 +26,9 @@ import org.apache.amoro.server.process.ProcessService;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.DefaultTableRuntimeFactory;
 import org.apache.amoro.server.table.DefaultTableService;
-import org.apache.amoro.server.table.TableRuntimeFactoryManager;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.mockito.Mockito;
 
 import java.time.Duration;
 
@@ -40,26 +37,20 @@ public abstract class AMSServiceTestBase extends 
AMSManagerTestBase {
   private static DefaultOptimizingService OPTIMIZING_SERVICE = null;
   private static ProcessService PROCESS_SERVICE = null;
 
-  private static TableRuntimeFactoryManager tableRuntimeFactoryManager = null;
-
   @BeforeClass
   public static void initTableService() {
     DefaultTableRuntimeFactory runtimeFactory = new 
DefaultTableRuntimeFactory();
-    tableRuntimeFactoryManager = 
Mockito.mock(TableRuntimeFactoryManager.class);
-    Mockito.when(tableRuntimeFactoryManager.installedPlugins())
-        .thenReturn(Lists.newArrayList(runtimeFactory));
     try {
       Configurations configurations = new Configurations();
       configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, 
Duration.ofMillis(800L));
       configurations.set(
           AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, 
Duration.ofMillis(30000L));
       TABLE_SERVICE =
-          new DefaultTableService(
-              new Configurations(), CATALOG_MANAGER, 
tableRuntimeFactoryManager);
+          new DefaultTableService(new Configurations(), CATALOG_MANAGER, 
runtimeFactory);
       OPTIMIZING_SERVICE =
           new DefaultOptimizingService(
               configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, 
TABLE_SERVICE);
-      PROCESS_SERVICE = new ProcessService(configurations, TABLE_SERVICE);
+      PROCESS_SERVICE = new ProcessService(TABLE_SERVICE);
 
       
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());
       TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain());
@@ -80,7 +71,6 @@ public abstract class AMSServiceTestBase extends 
AMSManagerTestBase {
     TABLE_SERVICE.dispose();
     MetricManager.dispose();
     EventsManager.dispose();
-    tableRuntimeFactoryManager = null;
   }
 
   protected DefaultTableService tableService() {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
index 792faebcf..898b3d015 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
@@ -41,7 +41,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
 
 import java.util.List;
 
@@ -49,7 +48,7 @@ import java.util.List;
 public class TestDefaultTableRuntimeHandler extends AMSTableTestBase {
 
   private DefaultTableService tableService;
-  private final TableRuntimeFactoryManager runtimeFactoryManager;
+  private final DefaultTableRuntimeFactory runtimeFactory;
 
   @Parameterized.Parameters(name = "{0}, {1}")
   public static Object[] parameters() {
@@ -66,16 +65,12 @@ public class TestDefaultTableRuntimeHandler extends 
AMSTableTestBase {
   public TestDefaultTableRuntimeHandler(
       CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
     super(catalogTestHelper, tableTestHelper, false);
-    DefaultTableRuntimeFactory runtimeFactory = new 
DefaultTableRuntimeFactory();
-    runtimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class);
-    Mockito.when(runtimeFactoryManager.installedPlugins())
-        .thenReturn(Lists.newArrayList(runtimeFactory));
+    this.runtimeFactory = new DefaultTableRuntimeFactory();
   }
 
   @Test
   public void testInitialize() throws Exception {
-    tableService =
-        new DefaultTableService(new Configurations(), CATALOG_MANAGER, 
runtimeFactoryManager);
+    tableService = new DefaultTableService(new Configurations(), 
CATALOG_MANAGER, runtimeFactory);
     TestHandler handler = new TestHandler();
     tableService.addHandlerChain(handler);
     tableService.initialize();
@@ -94,8 +89,7 @@ public class TestDefaultTableRuntimeHandler extends 
AMSTableTestBase {
     Assert.assertTrue(handler.isDisposed());
 
     // initialize with a history table
-    tableService =
-        new DefaultTableService(new Configurations(), CATALOG_MANAGER, 
runtimeFactoryManager);
+    tableService = new DefaultTableService(new Configurations(), 
CATALOG_MANAGER, runtimeFactory);
     handler = new TestHandler();
     tableService.addHandlerChain(handler);
     tableService.initialize();
@@ -132,8 +126,7 @@ public class TestDefaultTableRuntimeHandler extends 
AMSTableTestBase {
 
   @Test
   public void testRefreshUpdatesOptimizerGroup() throws Exception {
-    tableService =
-        new DefaultTableService(new Configurations(), CATALOG_MANAGER, 
runtimeFactoryManager);
+    tableService = new DefaultTableService(new Configurations(), 
CATALOG_MANAGER, runtimeFactory);
     TestHandler handler = new TestHandler();
     tableService.addHandlerChain(handler);
     tableService.initialize();
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java 
b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
index 90e26bd4b..5799166c5 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.amoro.table;
 
-import org.apache.amoro.ActivePlugin;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.process.ActionCoordinator;
@@ -29,7 +28,7 @@ import java.util.Map;
 import java.util.Optional;
 
 /** Table runtime factory. */
-public interface TableRuntimeFactory extends ActivePlugin {
+public interface TableRuntimeFactory {
 
   List<ActionCoordinator> supportedCoordinators();
 


Reply via email to