czy006 commented on code in PR #3937:
URL: https://github.com/apache/amoro/pull/3937#discussion_r2740936116


##########
amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java:
##########
@@ -301,6 +346,40 @@ private void onLeaderLost() {
     }
   }
 
+  @Override
+  public List<AmsServerInfo> getAliveNodes() throws Exception {
+    List<AmsServerInfo> aliveNodes = new ArrayList<>();
+    if (!isLeader.get()) {
+      LOG.warn("Only leader node can get alive nodes list");
+      return aliveNodes;
+    }
+    try {
+      long currentTime = System.currentTimeMillis();
+      List<HaLeaseMeta> leases =
+          getAs(
+              HaLeaseMapper.class,
+              mapper -> mapper.selectLeasesByService(clusterName, 
OPTIMIZING_SERVICE));
+      for (HaLeaseMeta lease : leases) {
+        // Only include nodes with valid (non-expired) leases
+        if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > 
currentTime) {

Review Comment:
   Checking only the lease expiration time may return nodes that have just 
expired but have not yet been cleared.



##########
amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java:
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service for assigning bucket IDs to AMS nodes in master-slave mode. 
Periodically detects node
+ * changes and redistributes bucket IDs evenly.
+ */
+public class AmsAssignService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AmsAssignService.class);
+
+  private final ScheduledExecutorService assignScheduler =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("ams-assign-scheduler-%d")
+              .setDaemon(true)
+              .build());
+
+  private final HighAvailabilityContainer haContainer;
+  private final BucketAssignStore assignStore;
+  private final Configurations serviceConfig;
+  private final int bucketIdTotalCount;
+  private final long nodeOfflineTimeoutMs;
+  private final long assignIntervalSeconds;
+  private volatile boolean running = false;
+
+  // Package-private accessors for testing
+  BucketAssignStore getAssignStore() {
+    return assignStore;
+  }
+
+  boolean isRunning() {
+    return running;
+  }
+
+  void doAssignForTest() {
+    doAssign();
+  }
+
+  /**
+   * Constructor for AmsAssignService. Creates appropriate BucketAssignStore 
based on HA container
+   * type.
+   *
+   * @param haContainer High availability container (ZK or Database)
+   * @param serviceConfig Service configuration
+   * @param zkClient ZooKeeper client (null for Database mode)
+   */
+  public AmsAssignService(
+      HighAvailabilityContainer haContainer,
+      Configurations serviceConfig,
+      CuratorFramework zkClient) {
+    this.haContainer = haContainer;
+    this.serviceConfig = serviceConfig;
+    this.bucketIdTotalCount = 
serviceConfig.getInteger(AmoroManagementConf.BUCKET_ID_TOTAL_COUNT);
+    this.nodeOfflineTimeoutMs =
+        serviceConfig.get(AmoroManagementConf.NODE_OFFLINE_TIMEOUT).toMillis();
+    this.assignIntervalSeconds =
+        serviceConfig.get(AmoroManagementConf.ASSIGN_INTERVAL).getSeconds();
+    String clusterName = 
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+    // Create appropriate BucketAssignStore based on HA container type
+    if (haContainer instanceof 
org.apache.amoro.server.ha.ZkHighAvailabilityContainer) {
+      if (zkClient == null) {
+        throw new IllegalArgumentException(
+            "ZooKeeper client is required for ZkHighAvailabilityContainer");
+      }
+      this.assignStore = new ZkBucketAssignStore(zkClient, clusterName);
+    } else if (haContainer
+        instanceof 
org.apache.amoro.server.ha.DataBaseHighAvailabilityContainer) {
+      this.assignStore = new DatabaseBucketAssignStore(clusterName);
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported HA container type: " + 
haContainer.getClass().getName());
+    }
+  }
+
+  /**
+   * Start the assignment service. Only works in master-slave mode and when 
current node is leader.
+   */
+  public void start() {

Review Comment:
   These operations are non-transactional and appear to operate without using 
locks. Could this lead to inconsistent bucket allocations during Leader 
switchover?



##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java:
##########
@@ -79,10 +189,54 @@ public void start() {
     }
   }
 
+  /**
+   * Get the list of AMS nodes to interact with. In master-slave mode, returns 
all available nodes.
+   * In single node mode, returns a list with the configured AMS URL.
+   */
+  private List<String> getAmsNodeList() {
+    if (getAmsNodeManager() != null) {
+      List<String> nodes = getAmsNodeManager().getAllAmsUrls();
+      if (!nodes.isEmpty()) {
+        return nodes;
+      }
+    }
+    // Fallback to single node mode
+    return Collections.singletonList(getConfig().getAmsUrl());
+  }
+
   public int getThreadId() {
     return threadId;
   }
 
+  /**
+   * Poll task from the specified AMS node (used in master-slave mode).
+   *
+   * @param amsUrl The AMS node URL to poll task from
+   * @return The polled task, or null if no task available
+   */
+  private OptimizingTask pollTask(String amsUrl) {
+    OptimizingTask task = null;
+    try {
+      task = callAuthenticatedAms(amsUrl, (client, token) -> 
client.pollTask(token, threadId));
+      if (task != null) {
+        LOG.info(
+            "Optimizer executor[{}] polled task[{}] from AMS {}",
+            threadId,
+            task.getTaskId(),
+            amsUrl);
+      }
+    } catch (TException exception) {
+      LOG.error(

Review Comment:
   can we retry to get task? if have exception,maybe lost the task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to