This is an automated email from the ASF dual-hosted git repository.
baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 7ba51cda2 [Improvement]: Load process factories via
DefaultTableRuntimeFactory (#4100)
7ba51cda2 is described below
commit 7ba51cda2bd9efa0216c915a98429cdce45642b1
Author: baiyangtx <[email protected]>
AuthorDate: Thu Mar 5 17:32:12 2026 +0800
[Improvement]: Load process factories via DefaultTableRuntimeFactory (#4100)
* feat(ams): load process factories via DefaultTableRuntimeFactory
## Summary
Wire the new process plugin model into AMS so that table processes are
discovered from `ProcessFactory` implementations and scheduled via the existing
`ProcessService`.
## Details
- Extend `DefaultTableRuntimeFactory` to implement the
`TableRuntimeFactory` process APIs:
- Aggregate installed `ProcessFactory` instances by `TableFormat` /
`Action` and expose derived `ActionCoordinator` plugins via
`supportedCoordinators()`
- Merge `DefaultTableRuntime.REQUIRED_STATES` with additional states
required by all process factories for the same format when building
`TableRuntimeCreator`
- Keep using `DefaultTableRuntime` without introducing extra runtime types
- Introduce `DefaultActionCoordinator` to bridge `ProcessFactory`
trigger/recover semantics to the existing scheduler:
- Build trigger strategies from `ProcessFactory#triggerStrategy`
- Delegate `trigger` and `recoverTableProcess` to the underlying factory
- Add `TableProcessFactoryManager` as an
`AbstractPluginManager<ProcessFactory>` using the `process-factories` plugin
namespace
- Refactor `AmoroServiceContainer` startup sequence:
- Initialize `TableProcessFactoryManager` and collect all installed
`ProcessFactory` instances
- Initialize all `TableRuntimeFactory` plugins with the shared list of
process factories
- Collect all derived `ActionCoordinator`s from table runtime factories
and inject them into `ProcessService`
- Update `ProcessService` to accept a pre-built list of
`ActionCoordinator`s while keeping the original constructors for backward
compatibility and tests
- Run `mvn spotless:apply` and `mvn -pl amoro-ams -am -DskipTests compile`
to ensure style and compilation pass
Co-Authored-By: Aime <[email protected]>
Change-Id: Iaa867503c8b0bf93c2b7f0b8fe7d752e2ddbac67
* refactor(table-runtime): decouple TableRuntimeFactory from ActivePlugin
and use default runtime
## Summary
Decouple `TableRuntimeFactory` from the generic `ActivePlugin` mechanism
and make `AmoroServiceContainer` explicitly use `DefaultTableRuntimeFactory` as
the default implementation.
## Details
- Change `TableRuntimeFactory` in `amoro-common` so it no longer extends
`ActivePlugin`, keeping only process-related APIs:
- `List<ActionCoordinator> supportedCoordinators()`
- `void initialize(List<ProcessFactory> factories)`
- `Optional<TableRuntimeCreator> accept(ServerTableIdentifier,
Map<String, String>)`
- Refactor `TableRuntimeFactoryManager` in AMS:
- Remove inheritance from `AbstractPluginManager<TableRuntimeFactory>` to
avoid the `ActivePlugin` constraint
- Implement a simple manager that wraps a provided
`List<TableRuntimeFactory>`
- Provide a default no-arg constructor that installs a single
`DefaultTableRuntimeFactory`
- Keep `initialize()` as a no-op and `installedPlugins()` as the accessor
to preserve existing wiring in `DefaultTableService`
- Update `DefaultTableRuntimeFactory`:
- Remove `@Override` annotations from `open/close/name` since they no
longer implement `ActivePlugin` methods
- Keep the methods as no-op helpers with the same behavior
- Update `AmoroServiceContainer.startOptimizingService` to use
`DefaultTableRuntimeFactory` directly:
- Construct a `DefaultTableRuntimeFactory` instance explicitly
- Wrap it into `TableRuntimeFactoryManager` via
`Collections.singletonList(tableRuntimeFactory)`
- Use the resulting singleton list as the only table runtime factory when
initializing process factories and collecting `ActionCoordinator`s
- Leave `DefaultTableService` logic unchanged, it still uses
`TableRuntimeFactoryManager.installedPlugins()` but now operates over the
explicit default factory list
- Ensure `ProcessService` changes from previous step are included in this
commit so the module compiles cleanly
## Verification
- Ran `./mvnw -pl amoro-ams -am -DskipTests compile` from repo root
- Build succeeded for the full AMS reactor
- `spotless` and `checkstyle` passed with only existing warnings
unrelated to this change
Co-Authored-By: Aime <[email protected]>
Change-Id: Ifa8deef0d2553176300cdef2c0cb073d52ee3303
* refactor(table-runtime): remove TableRuntimeFactoryManager and inline
default factory
## Summary
- Remove TableRuntimeFactoryManager indirection and wire
DefaultTableService directly with a TableRuntimeFactory implementation.
- Simplify DefaultTableRuntimeFactory after decoupling from plugin
framework and ActivePlugin lifecycle.
- Inline default table runtime factory wiring into AmoroServiceContainer
and tests, and update process/service initialization.
## Details
- DefaultTableService
- Change constructor to accept a TableRuntimeFactory instead of
TableRuntimeFactoryManager.
- Replace iteration over installed factories with a single
factory.accept(...) call when creating TableRuntime instances.
- DefaultTableRuntimeFactory
- Drop unused ActivePlugin-style methods: open(Map<String, String>),
close(), and name().
- AmoroServiceContainer
- Instantiate a single DefaultTableRuntimeFactory in
startOptimizingService and pass it into DefaultTableService.
- Initialize the defaultRuntimeFactory with ProcessFactory plugins and
derive ActionCoordinators from it directly.
- Tests
- AMSServiceTestBase: construct DefaultTableService with a concrete
DefaultTableRuntimeFactory instead of a mocked TableRuntimeFactoryManager.
- TestDefaultTableRuntimeHandler: hold a DefaultTableRuntimeFactory field
and pass it into DefaultTableService for all test setups.
- Cleanup
- Delete TableRuntimeFactoryManager class and remove all references and
related imports across main and test code.
- Fix spotless formatting violations in the touched files so that `mvn
-pl amoro-ams -am -DskipTests compile` passes.
Co-Authored-By: Aime <[email protected]>
Change-Id: I8acca1841470dfc1bbd87b77e424a34f4d52ae82
* fix comment
* fix comment
* fix comment
* fix comment
---------
Co-authored-by: zhangyongxiang.alpha <[email protected]>
Co-authored-by: Aime <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 23 ++--
.../amoro/server/process/ProcessService.java | 36 ++----
.../TableProcessFactoryManager.java} | 18 ++-
.../server/table/DefaultActionCoordinator.java | 124 +++++++++++++++++++++
.../server/table/DefaultTableRuntimeFactory.java | 112 ++++++++++++++++---
.../amoro/server/table/DefaultTableService.java | 35 +++---
.../apache/amoro/server/AMSServiceTestBase.java | 14 +--
.../table/TestDefaultTableRuntimeHandler.java | 17 +--
.../apache/amoro/table/TableRuntimeFactory.java | 3 +-
9 files changed, 283 insertions(+), 99 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 4fd3e5e9a..6fa3293e9 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -32,6 +32,8 @@ import org.apache.amoro.config.ConfigurationException;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
@@ -47,16 +49,18 @@ import
org.apache.amoro.server.persistence.DataSourceFactory;
import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.process.ProcessService;
+import org.apache.amoro.server.process.ProcessService.ExecuteEngineManager;
+import org.apache.amoro.server.process.TableProcessFactoryManager;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.DefaultOptimizerManager;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
import org.apache.amoro.server.table.DefaultTableManager;
+import org.apache.amoro.server.table.DefaultTableRuntimeFactory;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
-import org.apache.amoro.server.table.TableRuntimeFactoryManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.server.utils.ThriftServiceProxy;
@@ -229,17 +233,22 @@ public class AmoroServiceContainer {
}
public void startOptimizingService() throws Exception {
- TableRuntimeFactoryManager tableRuntimeFactoryManager = new
TableRuntimeFactoryManager();
- tableRuntimeFactoryManager.initialize();
+ // Load process factories and build action coordinators from default table
runtime factory.
+ TableProcessFactoryManager tableProcessFactoryManager = new
TableProcessFactoryManager();
+ tableProcessFactoryManager.initialize();
+ List<ProcessFactory> processFactories =
tableProcessFactoryManager.installedPlugins();
- tableService =
- new DefaultTableService(serviceConfig, catalogManager,
tableRuntimeFactoryManager);
+ DefaultTableRuntimeFactory defaultRuntimeFactory = new
DefaultTableRuntimeFactory();
+ defaultRuntimeFactory.initialize(processFactories);
+ List<ActionCoordinator> actionCoordinators =
defaultRuntimeFactory.supportedCoordinators();
+ ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
+
+ tableService = new DefaultTableService(serviceConfig, catalogManager,
defaultRuntimeFactory);
+ processService = new ProcessService(tableService, actionCoordinators,
executeEngineManager);
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager,
optimizerManager, tableService);
- processService = new ProcessService(serviceConfig, tableService);
-
LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
addHandlerChain(optimizingService.getTableRuntimeHandler());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index 252d1574b..79881c61b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -23,7 +23,6 @@ import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
-import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ProcessEvent;
@@ -64,8 +63,8 @@ public class ProcessService extends PersistentBase {
new ConcurrentHashMap<>();
private final Map<EngineType, ExecuteEngine> executeEngines = new
ConcurrentHashMap<>();
- private final ActionCoordinatorManager actionCoordinatorManager;
private final ExecuteEngineManager executeEngineManager;
+ private final List<ActionCoordinator> actionCoordinatorList;
private final ProcessRuntimeHandler tableRuntimeHandler = new
ProcessRuntimeHandler();
private final ThreadPoolExecutor processExecutionPool =
new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<>());
@@ -73,17 +72,16 @@ public class ProcessService extends PersistentBase {
private final Map<ServerTableIdentifier, Map<Long, TableProcess>>
activeTableProcess =
new ConcurrentHashMap<>();
- public ProcessService(Configurations serviceConfig, TableService
tableService) {
- this(serviceConfig, tableService, new ActionCoordinatorManager(), new
ExecuteEngineManager());
+ public ProcessService(TableService tableService) {
+ this(tableService, Collections.emptyList(), new ExecuteEngineManager());
}
public ProcessService(
- Configurations serviceConfig,
TableService tableService,
- ActionCoordinatorManager actionCoordinatorManager,
+ List<ActionCoordinator> actionCoordinators,
ExecuteEngineManager executeEngineManager) {
this.tableService = tableService;
- this.actionCoordinatorManager = actionCoordinatorManager;
+ this.actionCoordinatorList = actionCoordinators;
this.executeEngineManager = executeEngineManager;
}
@@ -148,7 +146,6 @@ public class ProcessService extends PersistentBase {
/** Dispose the service, shutdown engines and clear active processes. */
public void dispose() {
- actionCoordinatorManager.close();
executeEngineManager.close();
processExecutionPool.shutdown();
activeTableProcess.clear();
@@ -156,16 +153,12 @@ public class ProcessService extends PersistentBase {
private void initialize(List<TableRuntime> tableRuntimes) {
LOG.info("Initializing process service");
- actionCoordinatorManager.initialize();
- actionCoordinatorManager
- .installedPlugins()
- .forEach(
- actionCoordinator -> {
- actionCoordinators.put(
- actionCoordinator.action().getName(),
- new ActionCoordinatorScheduler(
- actionCoordinator, tableService, ProcessService.this));
- });
+ // Pre-configured coordinators built from TableRuntimeFactory /
ProcessFactory
+ for (ActionCoordinator actionCoordinator : actionCoordinatorList) {
+ actionCoordinators.put(
+ actionCoordinator.action().getName(),
+ new ActionCoordinatorScheduler(actionCoordinator, tableService,
ProcessService.this));
+ }
executeEngineManager.initialize();
executeEngineManager
.installedPlugins()
@@ -553,13 +546,6 @@ public class ProcessService extends PersistentBase {
}
}
- /** Manager for {@link ActionCoordinator} plugins. */
- public static class ActionCoordinatorManager extends
AbstractPluginManager<ActionCoordinator> {
- public ActionCoordinatorManager() {
- super("action-coordinators");
- }
- }
-
/** Manager for {@link ExecuteEngine} plugins. */
public static class ExecuteEngineManager extends
AbstractPluginManager<ExecuteEngine> {
public ExecuteEngineManager() {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java
old mode 100644
new mode 100755
similarity index 61%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java
rename to
amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java
index 65a906108..f54290937
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java
@@ -16,15 +16,21 @@
* limitations under the License.
*/
-package org.apache.amoro.server.table;
+package org.apache.amoro.server.process;
+import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.manager.AbstractPluginManager;
-import org.apache.amoro.table.TableRuntimeFactory;
-public class TableRuntimeFactoryManager extends
AbstractPluginManager<TableRuntimeFactory> {
- public static final String PLUGIN_CATEGORY = "table-runtime-factories";
+/**
+ * Plugin manager for {@link ProcessFactory} implementations.
+ *
+ * <p>Process factories are configured via {@code
plugins/process-factories.yaml} and are
+ * responsible for describing how different {@code TableFormat} / {@code
Action} combinations should
+ * be scheduled and executed.
+ */
+public class TableProcessFactoryManager extends
AbstractPluginManager<ProcessFactory> {
- public TableRuntimeFactoryManager() {
- super(PLUGIN_CATEGORY);
+ public TableProcessFactoryManager() {
+ super("process-factories");
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java
new file mode 100755
index 000000000..053e3c1cf
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.table;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
+import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Default implementation of {@link ActionCoordinator} that bridges {@link
ProcessFactory}
+ * declarations to the AMS scheduling framework.
+ */
+public class DefaultActionCoordinator implements ActionCoordinator {
+
+ private final Action action;
+ private final TableFormat format;
+ private final ProcessFactory factory;
+ private final ProcessTriggerStrategy strategy;
+
+ public DefaultActionCoordinator(TableFormat format, Action action,
ProcessFactory factory) {
+ this.action = action;
+ this.format = format;
+ this.factory = factory;
+ this.strategy = factory.triggerStrategy(format, action);
+ Preconditions.checkArgument(
+ strategy != null,
+ "ProcessTriggerStrategy cannot be null for format %s, action %s,
factory %s",
+ format,
+ action,
+ factory.name());
+ }
+
+ @Override
+ public String name() {
+ // No need to be globally unique, this coordinator is not discovered via
plugin manager.
+ return String.format("%s-%s-coordinator", format.name().toLowerCase(),
action.getName());
+ }
+
+ @Override
+ public void open(Map<String, String> properties) {
+ // No-op: lifecycle is managed by owning TableRuntimeFactory.
+ }
+
+ @Override
+ public void close() {
+ // No-op: nothing to close.
+ }
+
+ @Override
+ public boolean formatSupported(TableFormat format) {
+ return this.format.equals(format);
+ }
+
+ @Override
+ public int parallelism() {
+ return strategy.getTriggerParallelism();
+ }
+
+ @Override
+ public Action action() {
+ return action;
+ }
+
+ @Override
+ public long getNextExecutingTime(TableRuntime tableRuntime) {
+ // Fixed-rate scheduling based on configured trigger interval.
+ return strategy.getTriggerInterval().toMillis();
+ }
+
+ @Override
+ public boolean enabled(TableRuntime tableRuntime) {
+ return formatSupported(tableRuntime.getFormat());
+ }
+
+ @Override
+ public long getExecutorDelay() {
+ return strategy.getTriggerInterval().toMillis();
+ }
+
+ @Override
+ public Optional<TableProcess> trigger(TableRuntime tableRuntime) {
+ return factory.trigger(tableRuntime, action);
+ }
+
+ @Override
+ public TableProcess recoverTableProcess(
+ TableRuntime tableRuntime, TableProcessStore processStore) {
+ try {
+ return factory.recover(tableRuntime, processStore);
+ } catch (RecoverProcessFailedException e) {
+ throw new IllegalStateException(
+ String.format(
+ "Failed to recover table process for format %s, action %s, table
%s",
+ format, action, tableRuntime.getTableIdentifier()),
+ e);
+ }
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
index 41c8041f2..83050799a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
@@ -18,6 +18,7 @@
package org.apache.amoro.server.table;
+import org.apache.amoro.Action;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
@@ -28,45 +29,122 @@ import org.apache.amoro.table.StateKey;
import org.apache.amoro.table.TableRuntimeFactory;
import org.apache.amoro.table.TableRuntimeStore;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+/**
+ * Default {@link TableRuntimeFactory} implementation used by AMS.
+ *
+ * <p>Besides creating {@link DefaultTableRuntime} instances for mixed/iceberg
formats, this factory
+ * also aggregates {@link ProcessFactory} declarations to expose {@link
ActionCoordinator} plugins
+ * for different {@link TableFormat}/{@link Action} combinations.
+ */
public class DefaultTableRuntimeFactory implements TableRuntimeFactory {
- @Override
- public void open(Map<String, String> properties) {}
- @Override
- public void close() {}
+ /** Mapping from table format to its supported actions and corresponding
process factory. */
+ private final Map<TableFormat, Map<Action, ProcessFactory>>
factoriesByFormat = new HashMap<>();
- @Override
- public String name() {
- return "default";
- }
+ /** Coordinators derived from all installed process factories. */
+ private final List<ActionCoordinator> supportedCoordinators =
Lists.newArrayList();
@Override
public List<ActionCoordinator> supportedCoordinators() {
- return Lists.newArrayList();
+ return Collections.unmodifiableList(supportedCoordinators);
}
@Override
- public void initialize(List<ProcessFactory> factories) {}
+ public void initialize(List<ProcessFactory> factories) {
+ factoriesByFormat.clear();
+ supportedCoordinators.clear();
+
+ for (ProcessFactory factory : factories) {
+ Map<TableFormat, Set<Action>> supported = factory.supportedActions();
+ if (supported == null || supported.isEmpty()) {
+ continue;
+ }
+
+ for (Map.Entry<TableFormat, Set<Action>> entry : supported.entrySet()) {
+ TableFormat format = entry.getKey();
+ Map<Action, ProcessFactory> byAction =
+ factoriesByFormat.computeIfAbsent(format, k -> new HashMap<>());
+
+ for (Action action : entry.getValue()) {
+ ProcessFactory existed = byAction.get(action);
+ if (existed != null && existed != factory) {
+ throw new IllegalArgumentException(
+ String.format(
+ "ProcessFactory conflict for format %s and action %s,
existing: %s, new: %s",
+ format, action, existed.name(), factory.name()));
+ }
+ byAction.put(action, factory);
+ supportedCoordinators.add(new DefaultActionCoordinator(format,
action, factory));
+ }
+ }
+ }
+ }
@Override
public Optional<TableRuntimeCreator> accept(
ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties) {
- if (tableIdentifier
- .getFormat()
- .in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE,
TableFormat.ICEBERG)) {
- return Optional.of(new TableRuntimeCreatorImpl());
+ TableFormat format = tableIdentifier.getFormat();
+ boolean defaultSupported =
+ format.in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE,
TableFormat.ICEBERG);
+ boolean hasProcessFactories = factoriesByFormat.containsKey(format);
+
+ if (!defaultSupported && !hasProcessFactories) {
+ return Optional.empty();
}
- return Optional.empty();
+
+ return Optional.of(new TableRuntimeCreatorImpl(format));
}
- private static class TableRuntimeCreatorImpl implements
TableRuntimeFactory.TableRuntimeCreator {
+ private class TableRuntimeCreatorImpl implements
TableRuntimeFactory.TableRuntimeCreator {
+
+ private final TableFormat format;
+
+ private TableRuntimeCreatorImpl(TableFormat format) {
+ this.format = format;
+ }
+
@Override
public List<StateKey<?>> requiredStateKeys() {
- return DefaultTableRuntime.REQUIRED_STATES;
+ Map<String, StateKey<?>> merged = new LinkedHashMap<>();
+ // 1) DefaultTableRuntime required states
+ for (StateKey<?> stateKey : DefaultTableRuntime.REQUIRED_STATES) {
+ merged.put(stateKey.getKey(), stateKey);
+ }
+
+ // 2) Extra states from all process factories for this format (if any)
+ Map<Action, ProcessFactory> byAction = factoriesByFormat.get(format);
+ if (byAction != null) {
+ Map<String, ProcessFactory> deFactories = new HashMap<>();
+ byAction.forEach((a, f) -> deFactories.putIfAbsent(f.name(), f));
+ deFactories
+ .values()
+ .forEach(
+ factory ->
+ factory
+ .requiredStates()
+ .forEach(
+ stateKey -> {
+ if (merged.containsKey(stateKey.getKey())) {
+ throw new IllegalStateException(
+ "Failed to initialize table runtime
creator, due to stateKey: "
+ + stateKey.getKey()
+ + " declared by process-factory: "
+ + factory.name()
+ + " has already been defined.");
+ }
+ merged.put(stateKey.getKey(), stateKey);
+ }));
+ }
+
+ return Lists.newArrayList(merged.values());
}
@Override
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 464cd5601..a247b1804 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -44,6 +44,7 @@ import
org.apache.amoro.shade.guava32.com.google.common.base.Objects;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.amoro.table.TableRuntimeFactory;
import org.apache.amoro.table.TableSummary;
import org.apache.amoro.utils.TablePropertyUtil;
import org.slf4j.Logger;
@@ -85,19 +86,19 @@ public class DefaultTableService extends PersistentBase
implements TableService
private final CompletableFuture<Boolean> initialized = new
CompletableFuture<>();
private final Configurations serverConfiguration;
private final CatalogManager catalogManager;
- private final TableRuntimeFactoryManager tableRuntimeFactoryManager;
+ private final TableRuntimeFactory tableRuntimeFactory;
private RuntimeHandlerChain headHandler;
private ExecutorService tableExplorerExecutors;
public DefaultTableService(
Configurations configuration,
CatalogManager catalogManager,
- TableRuntimeFactoryManager tableRuntimeFactoryManager) {
+ TableRuntimeFactory tableRuntimeFactory) {
this.catalogManager = catalogManager;
this.externalCatalogRefreshingInterval =
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
this.serverConfiguration = configuration;
- this.tableRuntimeFactoryManager = tableRuntimeFactoryManager;
+ this.tableRuntimeFactory = tableRuntimeFactory;
}
@Override
@@ -515,21 +516,19 @@ public class DefaultTableService extends PersistentBase
implements TableService
ServerTableIdentifier identifier,
TableRuntimeMeta runtimeMeta,
List<TableRuntimeState> restoredStates) {
- return tableRuntimeFactoryManager.installedPlugins().stream()
- .map(f -> f.accept(identifier, runtimeMeta.getTableConfig()))
- .filter(Optional::isPresent)
- .map(Optional::get)
- .findFirst()
- .map(
- creator -> {
- DefaultTableRuntimeStore store =
- new DefaultTableRuntimeStore(
- identifier, runtimeMeta, creator.requiredStateKeys(),
restoredStates);
- store.setRuntimeHandler(this);
- TableRuntime tableRuntime = creator.create(store);
- store.setTableRuntime(tableRuntime);
- return tableRuntime;
- });
+ Optional<TableRuntimeFactory.TableRuntimeCreator> creatorOpt =
+ tableRuntimeFactory.accept(identifier, runtimeMeta.getTableConfig());
+
+ return creatorOpt.map(
+ creator -> {
+ DefaultTableRuntimeStore store =
+ new DefaultTableRuntimeStore(
+ identifier, runtimeMeta, creator.requiredStateKeys(),
restoredStates);
+ store.setRuntimeHandler(this);
+ TableRuntime tableRuntime = creator.create(store);
+ store.setTableRuntime(tableRuntime);
+ return tableRuntime;
+ });
}
private void revertTableRuntimeAdded(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 9f3d25b3f..f7266ba04 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -26,12 +26,9 @@ import org.apache.amoro.server.process.ProcessService;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.DefaultTableRuntimeFactory;
import org.apache.amoro.server.table.DefaultTableService;
-import org.apache.amoro.server.table.TableRuntimeFactoryManager;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.mockito.Mockito;
import java.time.Duration;
@@ -40,26 +37,20 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
private static DefaultOptimizingService OPTIMIZING_SERVICE = null;
private static ProcessService PROCESS_SERVICE = null;
- private static TableRuntimeFactoryManager tableRuntimeFactoryManager = null;
-
@BeforeClass
public static void initTableService() {
DefaultTableRuntimeFactory runtimeFactory = new
DefaultTableRuntimeFactory();
- tableRuntimeFactoryManager =
Mockito.mock(TableRuntimeFactoryManager.class);
- Mockito.when(tableRuntimeFactoryManager.installedPlugins())
- .thenReturn(Lists.newArrayList(runtimeFactory));
try {
Configurations configurations = new Configurations();
configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT,
Duration.ofMillis(800L));
configurations.set(
AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT,
Duration.ofMillis(30000L));
TABLE_SERVICE =
- new DefaultTableService(
- new Configurations(), CATALOG_MANAGER,
tableRuntimeFactoryManager);
+ new DefaultTableService(new Configurations(), CATALOG_MANAGER,
runtimeFactory);
OPTIMIZING_SERVICE =
new DefaultOptimizingService(
configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER,
TABLE_SERVICE);
- PROCESS_SERVICE = new ProcessService(configurations, TABLE_SERVICE);
+ PROCESS_SERVICE = new ProcessService(TABLE_SERVICE);
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());
TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain());
@@ -80,7 +71,6 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
TABLE_SERVICE.dispose();
MetricManager.dispose();
EventsManager.dispose();
- tableRuntimeFactoryManager = null;
}
protected DefaultTableService tableService() {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
index 792faebcf..898b3d015 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
@@ -41,7 +41,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
import java.util.List;
@@ -49,7 +48,7 @@ import java.util.List;
public class TestDefaultTableRuntimeHandler extends AMSTableTestBase {
private DefaultTableService tableService;
- private final TableRuntimeFactoryManager runtimeFactoryManager;
+ private final DefaultTableRuntimeFactory runtimeFactory;
@Parameterized.Parameters(name = "{0}, {1}")
public static Object[] parameters() {
@@ -66,16 +65,12 @@ public class TestDefaultTableRuntimeHandler extends
AMSTableTestBase {
public TestDefaultTableRuntimeHandler(
CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
super(catalogTestHelper, tableTestHelper, false);
- DefaultTableRuntimeFactory runtimeFactory = new
DefaultTableRuntimeFactory();
- runtimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class);
- Mockito.when(runtimeFactoryManager.installedPlugins())
- .thenReturn(Lists.newArrayList(runtimeFactory));
+ this.runtimeFactory = new DefaultTableRuntimeFactory();
}
@Test
public void testInitialize() throws Exception {
- tableService =
- new DefaultTableService(new Configurations(), CATALOG_MANAGER,
runtimeFactoryManager);
+ tableService = new DefaultTableService(new Configurations(),
CATALOG_MANAGER, runtimeFactory);
TestHandler handler = new TestHandler();
tableService.addHandlerChain(handler);
tableService.initialize();
@@ -94,8 +89,7 @@ public class TestDefaultTableRuntimeHandler extends
AMSTableTestBase {
Assert.assertTrue(handler.isDisposed());
// initialize with a history table
- tableService =
- new DefaultTableService(new Configurations(), CATALOG_MANAGER,
runtimeFactoryManager);
+ tableService = new DefaultTableService(new Configurations(),
CATALOG_MANAGER, runtimeFactory);
handler = new TestHandler();
tableService.addHandlerChain(handler);
tableService.initialize();
@@ -132,8 +126,7 @@ public class TestDefaultTableRuntimeHandler extends
AMSTableTestBase {
@Test
public void testRefreshUpdatesOptimizerGroup() throws Exception {
- tableService =
- new DefaultTableService(new Configurations(), CATALOG_MANAGER,
runtimeFactoryManager);
+ tableService = new DefaultTableService(new Configurations(),
CATALOG_MANAGER, runtimeFactory);
TestHandler handler = new TestHandler();
tableService.addHandlerChain(handler);
tableService.initialize();
diff --git
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
index 90e26bd4b..5799166c5 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
@@ -18,7 +18,6 @@
package org.apache.amoro.table;
-import org.apache.amoro.ActivePlugin;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.process.ActionCoordinator;
@@ -29,7 +28,7 @@ import java.util.Map;
import java.util.Optional;
/** Table runtime factory. */
-public interface TableRuntimeFactory extends ActivePlugin {
+public interface TableRuntimeFactory {
List<ActionCoordinator> supportedCoordinators();