baiyangtx commented on code in PR #4100:
URL: https://github.com/apache/amoro/pull/4100#discussion_r2887453827
##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java:
##########
@@ -28,45 +29,108 @@
import org.apache.amoro.table.TableRuntimeFactory;
import org.apache.amoro.table.TableRuntimeStore;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+/**
+ * Default {@link TableRuntimeFactory} implementation used by AMS.
+ *
+ * <p>Besides creating {@link DefaultTableRuntime} instances for mixed/iceberg
formats, this factory
+ * also aggregates {@link ProcessFactory} declarations to expose {@link
ActionCoordinator} plugins
+ * for different {@link TableFormat}/{@link Action} combinations.
+ */
public class DefaultTableRuntimeFactory implements TableRuntimeFactory {
- @Override
- public void open(Map<String, String> properties) {}
- @Override
- public void close() {}
+ /** Mapping from table format to its supported actions and corresponding
process factory. */
+ private final Map<TableFormat, Map<Action, ProcessFactory>>
factoriesByFormat = new HashMap<>();
- @Override
- public String name() {
- return "default";
- }
+ /** Coordinators derived from all installed process factories. */
+ private final List<ActionCoordinator> supportedCoordinators =
Lists.newArrayList();
@Override
public List<ActionCoordinator> supportedCoordinators() {
- return Lists.newArrayList();
+ return supportedCoordinators;
}
@Override
- public void initialize(List<ProcessFactory> factories) {}
+ public void initialize(List<ProcessFactory> factories) {
+ factoriesByFormat.clear();
+ supportedCoordinators.clear();
+
+ for (ProcessFactory factory : factories) {
+ Map<TableFormat, Set<Action>> supported = factory.supportedActions();
+ if (supported == null || supported.isEmpty()) {
+ continue;
+ }
+
+ for (Map.Entry<TableFormat, Set<Action>> entry : supported.entrySet()) {
+ TableFormat format = entry.getKey();
+ Map<Action, ProcessFactory> byAction =
+ factoriesByFormat.computeIfAbsent(format, k -> new HashMap<>());
+
+ for (Action action : entry.getValue()) {
+ ProcessFactory existed = byAction.get(action);
+ if (existed != null && existed != factory) {
+ throw new IllegalArgumentException(
+ String.format(
+ "ProcessFactory conflict for format %s and action %s,
existing: %s, new: %s",
+ format, action, existed.name(), factory.name()));
+ }
+ byAction.put(action, factory);
+ supportedCoordinators.add(new DefaultActionCoordinator(format,
action, factory));
+ }
+ }
+ }
+ }
@Override
public Optional<TableRuntimeCreator> accept(
ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties) {
- if (tableIdentifier
- .getFormat()
- .in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE,
TableFormat.ICEBERG)) {
- return Optional.of(new TableRuntimeCreatorImpl());
+ TableFormat format = tableIdentifier.getFormat();
+ boolean defaultSupported =
+ format.in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE,
TableFormat.ICEBERG);
+ boolean hasProcessFactories = factoriesByFormat.containsKey(format);
+
+ if (!defaultSupported && !hasProcessFactories) {
+ return Optional.empty();
}
- return Optional.empty();
+
+ return Optional.of(new TableRuntimeCreatorImpl(format));
}
- private static class TableRuntimeCreatorImpl implements
TableRuntimeFactory.TableRuntimeCreator {
+ private class TableRuntimeCreatorImpl implements
TableRuntimeFactory.TableRuntimeCreator {
+
+ private final TableFormat format;
+
+ private TableRuntimeCreatorImpl(TableFormat format) {
+ this.format = format;
+ }
+
@Override
public List<StateKey<?>> requiredStateKeys() {
- return DefaultTableRuntime.REQUIRED_STATES;
+ Map<String, StateKey<?>> merged = new LinkedHashMap<>();
+ // 1) DefaultTableRuntime required states
+ for (StateKey<?> stateKey : DefaultTableRuntime.REQUIRED_STATES) {
+ merged.put(stateKey.getKey(), stateKey);
+ }
+
+ // 2) Extra states from all process factories for this format (if any)
+ Map<Action, ProcessFactory> byAction = factoriesByFormat.get(format);
Review Comment:
I will add a check to throw an exception when conflicts occur.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]