wardlican commented on code in PR #3950:
URL: https://github.com/apache/amoro/pull/3950#discussion_r3007320307


##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java:
##########
@@ -41,21 +43,90 @@ public class AbstractOptimizerOperator implements 
Serializable {
   private final OptimizerConfig config;
   private final AtomicReference<String> token = new AtomicReference<>();
   private volatile boolean stopped = false;
+  private transient volatile AmsNodeManager amsNodeManager;
+  private transient volatile ThriftAmsNodeManager thriftAmsNodeManager;
 
   public AbstractOptimizerOperator(OptimizerConfig config) {
     Preconditions.checkNotNull(config);
     this.config = config;
+    if (config.isMasterSlaveMode()) {
+      String amsUrl = config.getAmsUrl();
+      if (amsUrl.startsWith("zookeeper://")) {
+        // ZK mode: discover nodes from ZooKeeper ephemeral nodes
+        try {
+          this.amsNodeManager = new AmsNodeManager(amsUrl);
+          LOG.info("Initialized ZK AmsNodeManager for master-slave mode");
+        } catch (Exception e) {
+          LOG.warn("Failed to initialize AmsNodeManager, will use single AMS 
URL", e);
+        }
+      } else {
+        // DB mode (or direct thrift://): discover nodes via 
getOptimizingNodeUrls() Thrift RPC
+        try {
+          this.thriftAmsNodeManager = new ThriftAmsNodeManager(amsUrl);
+          LOG.info("Initialized ThriftAmsNodeManager for master-slave mode (DB 
HA)");
+        } catch (Exception e) {
+          LOG.warn("Failed to initialize ThriftAmsNodeManager, will use single 
AMS URL", e);
+        }
+      }
+    }
+  }
+
+  /** Get the AmsNodeManager instance if available (ZK mode). */
+  protected AmsNodeManager getAmsNodeManager() {
+    return amsNodeManager;
+  }
+
+  /** Get the ThriftAmsNodeManager instance if available (DB mode). */
+  protected ThriftAmsNodeManager getThriftAmsNodeManager() {
+    return thriftAmsNodeManager;
+  }
+
+  /**
+   * Returns true if any node manager (ZK or Thrift) is active. Used by 
OptimizerExecutor to decide
+   * whether to use master-slave polling logic.
+   */
+  protected boolean hasAmsNodeManager() {
+    return amsNodeManager != null || thriftAmsNodeManager != null;
+  }
+
+  /**
+   * Get all AMS URLs from the active node manager. Falls back to the single 
configured URL when no
+   * node manager is available or the node list is empty.
+   */
+  protected List<String> getAmsNodeUrls() {
+    if (amsNodeManager != null) {
+      List<String> urls = amsNodeManager.getAllAmsUrls();
+      if (!urls.isEmpty()) {
+        return urls;
+      }
+    }
+    if (thriftAmsNodeManager != null) {
+      return thriftAmsNodeManager.getAllAmsUrls();
+    }
+    return Collections.singletonList(getConfig().getAmsUrl());
   }
 
   protected <T> T callAms(AmsCallOperation<T> operation) throws TException {
+    return callAms(getConfig().getAmsUrl(), operation);

Review Comment:
   Under the current design, the optimizer registers solely with the primary 
node to obtain a token. Each AMS node then automatically synchronizes the list 
of currently registered optimizers from the database; once this synchronization 
is complete, the optimizer is able to pull tasks from any of the other AMS 
nodes. Shifting to a model where each AMS node independently maintains tokens 
would introduce excessive complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to