xxubai commented on code in PR #4100:
URL: https://github.com/apache/amoro/pull/4100#discussion_r2887851737


##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java:
##########
@@ -28,45 +29,108 @@
 import org.apache.amoro.table.TableRuntimeFactory;
 import org.apache.amoro.table.TableRuntimeStore;
 
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
+/**
+ * Default {@link TableRuntimeFactory} implementation used by AMS.
+ *
+ * <p>Besides creating {@link DefaultTableRuntime} instances for mixed/iceberg 
formats, this factory
+ * also aggregates {@link ProcessFactory} declarations to expose {@link 
ActionCoordinator} plugins
+ * for different {@link TableFormat}/{@link Action} combinations.
+ */
 public class DefaultTableRuntimeFactory implements TableRuntimeFactory {
-  @Override
-  public void open(Map<String, String> properties) {}
 
-  @Override
-  public void close() {}
+  /** Mapping from table format to its supported actions and corresponding 
process factory. */
+  private final Map<TableFormat, Map<Action, ProcessFactory>> 
factoriesByFormat = new HashMap<>();
 
-  @Override
-  public String name() {
-    return "default";
-  }
+  /** Coordinators derived from all installed process factories. */
+  private final List<ActionCoordinator> supportedCoordinators = 
Lists.newArrayList();
 
   @Override
   public List<ActionCoordinator> supportedCoordinators() {
-    return Lists.newArrayList();
+    return supportedCoordinators;
   }
 
   @Override
-  public void initialize(List<ProcessFactory> factories) {}
+  public void initialize(List<ProcessFactory> factories) {
+    factoriesByFormat.clear();
+    supportedCoordinators.clear();
+
+    for (ProcessFactory factory : factories) {
+      Map<TableFormat, Set<Action>> supported = factory.supportedActions();
+      if (supported == null || supported.isEmpty()) {
+        continue;
+      }
+
+      for (Map.Entry<TableFormat, Set<Action>> entry : supported.entrySet()) {
+        TableFormat format = entry.getKey();
+        Map<Action, ProcessFactory> byAction =
+            factoriesByFormat.computeIfAbsent(format, k -> new HashMap<>());
+
+        for (Action action : entry.getValue()) {
+          ProcessFactory existed = byAction.get(action);
+          if (existed != null && existed != factory) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "ProcessFactory conflict for format %s and action %s, 
existing: %s, new: %s",
+                    format, action, existed.name(), factory.name()));
+          }
+          byAction.put(action, factory);
+          supportedCoordinators.add(new DefaultActionCoordinator(format, 
action, factory));
+        }
+      }
+    }
+  }
 
   @Override
   public Optional<TableRuntimeCreator> accept(
       ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties) {
-    if (tableIdentifier
-        .getFormat()
-        .in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, 
TableFormat.ICEBERG)) {
-      return Optional.of(new TableRuntimeCreatorImpl());
+    TableFormat format = tableIdentifier.getFormat();
+    boolean defaultSupported =
+        format.in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, 
TableFormat.ICEBERG);
+    boolean hasProcessFactories = factoriesByFormat.containsKey(format);
+
+    if (!defaultSupported && !hasProcessFactories) {
+      return Optional.empty();
     }
-    return Optional.empty();
+
+    return Optional.of(new TableRuntimeCreatorImpl(format));
   }
 
-  private static class TableRuntimeCreatorImpl implements 
TableRuntimeFactory.TableRuntimeCreator {
+  private class TableRuntimeCreatorImpl implements 
TableRuntimeFactory.TableRuntimeCreator {
+
+    private final TableFormat format;
+
+    private TableRuntimeCreatorImpl(TableFormat format) {
+      this.format = format;
+    }
+
     @Override
     public List<StateKey<?>> requiredStateKeys() {
-      return DefaultTableRuntime.REQUIRED_STATES;
+      Map<String, StateKey<?>> merged = new LinkedHashMap<>();
+      // 1) DefaultTableRuntime required states
+      for (StateKey<?> stateKey : DefaultTableRuntime.REQUIRED_STATES) {
+        merged.put(stateKey.getKey(), stateKey);
+      }
+
+      // 2) Extra states from all process factories for this format (if any)
+      Map<Action, ProcessFactory> byAction = factoriesByFormat.get(format);

Review Comment:
   When a single `ProcessFactory` supports multiple actions for the same 
format, byAction.values() will contain that factory instance multiple times. 
Each invocation of `factory.requiredStates()` produces the same keys, which 
will hit the merged.containsKey check and throw an `IllegalStateException`
   
   Fix: Deduplicate factories before iterating?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to