czy006 commented on code in PR #3922:
URL: https://github.com/apache/amoro/pull/3922#discussion_r2917219384


##########
amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating BucketAssignStore implementations based on HA 
configuration.
+ *
+ * <p>Supports different storage backends (ZK, database) according to HA type.
+ */
+public final class BucketAssignStoreFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BucketAssignStoreFactory.class);
+
+  private BucketAssignStoreFactory() {}
+
+  /**
+   * Creates a BucketAssignStore based on the given HA configuration and 
container.
+   *
+   * @param haContainer the HA container
+   * @param conf service configuration
+   * @return a BucketAssignStore implementation according to HA type
+   * @throws IllegalArgumentException if HA type is unsupported
+   * @throws RuntimeException if the ZK store cannot be created
+   */
+  public static BucketAssignStore create(
+      HighAvailabilityContainer haContainer, Configurations conf) {
+    String haType = conf.getString(AmoroManagementConf.HA_TYPE).toLowerCase();
+    String clusterName = conf.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+    switch (haType) {
+      case AmoroManagementConf.HA_TYPE_ZK:
+        if (haContainer instanceof 
org.apache.amoro.server.ha.ZkHighAvailabilityContainer) {

Review Comment:
   use import `org.apache.amoro.server.ha.ZkHighAvailabilityContainer`



##########
amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.properties.AmsHAProperties;
+import 
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ZooKeeper-based implementation of BucketAssignStore. Stores bucket ID 
assignments in ZooKeeper
+ * with the following structure: 
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/assignments -
+ * bucket IDs 
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/last-update-time - timestamp
+ */
+public class ZkBucketAssignStore implements BucketAssignStore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkBucketAssignStore.class);
+  private static final String ASSIGNMENTS_SUFFIX = "/assignments";
+  private static final String LAST_UPDATE_TIME_SUFFIX = "/last-update-time";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final TypeReference<List<String>> LIST_STRING_TYPE =
+      new TypeReference<List<String>>() {};
+
+  private final CuratorFramework zkClient;
+  private final String assignmentsBasePath;
+
+  public ZkBucketAssignStore(CuratorFramework zkClient, String clusterName) {
+    this.zkClient = zkClient;
+    this.assignmentsBasePath = 
AmsHAProperties.getBucketAssignmentsPath(clusterName);
+    try {
+      createPathIfNeeded(assignmentsBasePath);
+    } catch (Exception e) {
+      LOG.error("Failed to create bucket assignments path", e);
+      throw new RuntimeException("Failed to initialize ZkBucketAssignStore", 
e);
+    }
+  }
+
+  @Override
+  public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds) 
throws Exception {
+    String nodeKey = getNodeKey(nodeInfo);
+    String assignmentsPath = assignmentsBasePath + "/" + nodeKey + 
ASSIGNMENTS_SUFFIX;
+    String assignmentsJson = JacksonUtil.toJSONString(bucketIds);
+    try {
+      if (zkClient.checkExists().forPath(assignmentsPath) != null) {
+        zkClient
+            .setData()
+            .forPath(assignmentsPath, 
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+      } else {
+        zkClient
+            .create()
+            .creatingParentsIfNeeded()
+            .withMode(CreateMode.PERSISTENT)
+            .forPath(assignmentsPath, 
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+      }
+      updateLastUpdateTime(nodeInfo);
+      LOG.debug("Saved bucket assignments for node {}: {}", nodeKey, 
bucketIds);
+    } catch (Exception e) {
+      LOG.error("Failed to save bucket assignments for node {}", nodeKey, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public List<String> getAssignments(AmsServerInfo nodeInfo) throws Exception {
+    String nodeKey = getNodeKey(nodeInfo);
+    String assignmentsPath = assignmentsBasePath + "/" + nodeKey + 
ASSIGNMENTS_SUFFIX;
+    try {
+      if (zkClient.checkExists().forPath(assignmentsPath) == null) {
+        return new ArrayList<>();
+      }
+      byte[] data = zkClient.getData().forPath(assignmentsPath);
+      if (data == null || data.length == 0) {
+        return new ArrayList<>();
+      }
+      String assignmentsJson = new String(data, StandardCharsets.UTF_8);
+      return OBJECT_MAPPER.readValue(assignmentsJson, LIST_STRING_TYPE);

Review Comment:
   JacksonUtil Can readValue, Do we need to maintain the OBJECT_MAPPER 
ourselves here?



##########
amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java:
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service for assigning bucket IDs to AMS nodes in master-slave mode. 
Periodically detects node
+ * changes and redistributes bucket IDs evenly.
+ */
+public class AmsAssignService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AmsAssignService.class);
+
+  private final ScheduledExecutorService assignScheduler =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("ams-assign-scheduler-%d")
+              .setDaemon(true)
+              .build());
+
+  private final HighAvailabilityContainer haContainer;
+  private final BucketAssignStore assignStore;
+  private final Configurations serviceConfig;
+  private final int bucketIdTotalCount;
+  private final long nodeOfflineTimeoutMs;
+  private final long assignIntervalSeconds;
+  private volatile boolean running = false;
+
+  boolean isRunning() {
+    return running;
+  }
+
+  void doAssignForTest() {
+    doAssign();
+  }
+
+  public AmsAssignService(HighAvailabilityContainer haContainer, 
Configurations serviceConfig) {
+    this.haContainer = haContainer;
+    this.serviceConfig = serviceConfig;
+    this.bucketIdTotalCount =
+        serviceConfig.getInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT);
+    this.nodeOfflineTimeoutMs =
+        serviceConfig.get(AmoroManagementConf.NODE_OFFLINE_TIMEOUT).toMillis();
+    this.assignIntervalSeconds =
+        serviceConfig.get(AmoroManagementConf.ASSIGN_INTERVAL).getSeconds();
+    this.assignStore = BucketAssignStoreFactory.create(haContainer, 
serviceConfig);
+  }
+
+  /**
+   * Start the assignment service. Only works in master-slave mode and when 
current node is leader.
+   */
+  public void start() {
+    if (!serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)) {
+      LOG.info("Master-slave mode is not enabled, skip starting bucket 
assignment service");
+      return;
+    }
+    if (running) {
+      LOG.warn("Bucket assignment service is already running");
+      return;
+    }
+    running = true;
+    assignScheduler.scheduleWithFixedDelay(
+        this::doAssign, 10, assignIntervalSeconds, TimeUnit.SECONDS);
+    LOG.info("Bucket assignment service started with interval: {} seconds", 
assignIntervalSeconds);
+  }
+
+  /** Stop the assignment service. */
+  public void stop() {
+    if (!running) {
+      return;
+    }
+    running = false;
+    assignScheduler.shutdown();
+    try {
+      if (!assignScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+        assignScheduler.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      assignScheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("Bucket assignment service stopped");
+  }
+
+  private void doAssign() {
+    try {
+      if (!haContainer.hasLeadership()) {
+        LOG.debug("Current node is not leader, skip bucket assignment");
+        return;
+      }
+
+      List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+      if (aliveNodes.isEmpty()) {
+        LOG.debug("No alive nodes found, skip bucket assignment");
+        return;
+      }
+
+      Map<AmsServerInfo, List<String>> currentAssignments = 
assignStore.getAllAssignments();
+
+      // Create a mapping from stored nodes (may have null restBindPort) to 
alive nodes (complete
+      // info)
+      // Use host:thriftBindPort as the key for matching
+      Map<String, AmsServerInfo> aliveNodeMap = new java.util.HashMap<>();
+      for (AmsServerInfo node : aliveNodes) {
+        String key = getNodeKey(node);
+        aliveNodeMap.put(key, node);
+      }
+
+      // Normalize current assignments: map stored nodes to their 
corresponding alive nodes
+      Map<AmsServerInfo, List<String>> normalizedAssignments = new 
java.util.HashMap<>();
+      Set<AmsServerInfo> currentAssignedNodes = new HashSet<>();
+      for (Map.Entry<AmsServerInfo, List<String>> entry : 
currentAssignments.entrySet()) {
+        AmsServerInfo storedNode = entry.getKey();
+        String nodeKey = getNodeKey(storedNode);
+        AmsServerInfo aliveNode = aliveNodeMap.get(nodeKey);
+        if (aliveNode != null) {
+          // Node is alive, use the complete node info from aliveNodes
+          normalizedAssignments.put(aliveNode, entry.getValue());
+          currentAssignedNodes.add(aliveNode);
+        } else {
+          // Node is not in alive list, keep the stored node info for offline 
detection
+          normalizedAssignments.put(storedNode, entry.getValue());
+          currentAssignedNodes.add(storedNode);
+        }
+      }
+
+      Set<AmsServerInfo> aliveNodeSet = new HashSet<>(aliveNodes);
+
+      // Detect new nodes and offline nodes
+      Set<AmsServerInfo> newNodes = new HashSet<>(aliveNodeSet);
+      newNodes.removeAll(currentAssignedNodes);
+
+      Set<AmsServerInfo> offlineNodes = new HashSet<>();
+      for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+        String nodeKey = getNodeKey(storedNode);
+        if (!aliveNodeMap.containsKey(nodeKey)) {
+          offlineNodes.add(storedNode);
+        }
+      }
+
+      // Check for nodes that haven't updated for a long time
+      long currentTime = System.currentTimeMillis();
+      Set<String> aliveNodeKeys = new HashSet<>();
+      for (AmsServerInfo node : aliveNodes) {
+        aliveNodeKeys.add(getNodeKey(node));
+      }
+      for (AmsServerInfo node : currentAssignedNodes) {
+        String nodeKey = getNodeKey(node);
+        if (aliveNodeKeys.contains(nodeKey)) {
+          long lastUpdateTime = assignStore.getLastUpdateTime(node);
+          if (lastUpdateTime > 0 && (currentTime - lastUpdateTime) > 
nodeOfflineTimeoutMs) {
+            // Find the stored node for this alive node to add to offlineNodes
+            for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+              if (getNodeKey(storedNode).equals(nodeKey)) {
+                offlineNodes.add(storedNode);
+                break;
+              }
+            }
+            LOG.warn(
+                "Node {} is considered offline due to timeout. Last update: 
{}",
+                node,
+                lastUpdateTime);
+          }
+        }
+      }
+
+      boolean needReassign = !newNodes.isEmpty() || !offlineNodes.isEmpty();
+
+      if (needReassign) {
+        LOG.info(
+            "Detected node changes - New nodes: {}, Offline nodes: {}, 
Performing incremental reassignment...",
+            newNodes.size(),
+            offlineNodes.size());
+
+        // Step 1: Handle offline nodes - collect their buckets for 
redistribution
+        List<String> bucketsToRedistribute = new ArrayList<>();
+        for (AmsServerInfo offlineNode : offlineNodes) {
+          try {
+            List<String> offlineBuckets = currentAssignments.get(offlineNode);
+            if (offlineBuckets != null && !offlineBuckets.isEmpty()) {
+              bucketsToRedistribute.addAll(offlineBuckets);
+              LOG.info(
+                  "Collected {} buckets from offline node {} for 
redistribution",
+                  offlineBuckets.size(),
+                  offlineNode);
+            }
+            assignStore.removeAssignments(offlineNode);
+          } catch (Exception e) {
+            LOG.warn("Failed to remove assignments for offline node {}", 
offlineNode, e);
+          }
+        }
+
+        // Step 2: Calculate target assignment for balanced distribution
+        List<String> allBuckets = generateBucketIds();
+        int totalBuckets = allBuckets.size();
+        int totalAliveNodes = aliveNodes.size();
+        int targetBucketsPerNode = totalBuckets / totalAliveNodes;
+        int remainder = totalBuckets % totalAliveNodes;
+
+        // Step 3: Incremental reassignment
+        // Keep existing assignments for nodes that are still alive
+        Map<AmsServerInfo, List<String>> newAssignments = new 
java.util.HashMap<>();
+        Set<String> offlineNodeKeys = new HashSet<>();
+        for (AmsServerInfo offlineNode : offlineNodes) {
+          offlineNodeKeys.add(getNodeKey(offlineNode));
+        }
+        for (AmsServerInfo node : aliveNodes) {
+          String nodeKey = getNodeKey(node);
+          if (!offlineNodeKeys.contains(nodeKey)) {
+            // Node is alive and not offline, check if it has existing 
assignments
+            List<String> existingBuckets = normalizedAssignments.get(node);
+            if (existingBuckets != null && !existingBuckets.isEmpty()) {
+              // Keep existing buckets for alive nodes (not offline)
+              newAssignments.put(node, new ArrayList<>(existingBuckets));
+            } else {
+              // New node
+              newAssignments.put(node, new ArrayList<>());
+            }
+          } else {
+            // Node was offline, start with empty assignment
+            newAssignments.put(node, new ArrayList<>());
+          }
+        }
+
+        // Step 4: Redistribute buckets from offline nodes to alive nodes
+        if (!bucketsToRedistribute.isEmpty()) {
+          redistributeBucketsIncrementally(aliveNodes, bucketsToRedistribute, 
newAssignments);
+        }
+
+        // Step 5: Handle new nodes - balance buckets from existing nodes
+        if (!newNodes.isEmpty()) {
+          balanceBucketsForNewNodes(
+              aliveNodes, newNodes, newAssignments, targetBucketsPerNode, 
remainder);
+        }
+
+        // Step 6: Handle unassigned buckets (if any)
+        Set<String> allAssignedBuckets = new HashSet<>();
+        for (List<String> buckets : newAssignments.values()) {
+          allAssignedBuckets.addAll(buckets);
+        }
+        List<String> unassignedBuckets = new ArrayList<>();
+        for (String bucket : allBuckets) {
+          if (!allAssignedBuckets.contains(bucket)) {
+            unassignedBuckets.add(bucket);
+          }
+        }
+        if (!unassignedBuckets.isEmpty()) {
+          redistributeBucketsIncrementally(aliveNodes, unassignedBuckets, 
newAssignments);
+        }
+
+        // Step 7: Save all new assignments
+        for (Map.Entry<AmsServerInfo, List<String>> entry : 
newAssignments.entrySet()) {
+          try {
+            assignStore.saveAssignments(entry.getKey(), entry.getValue());
+            LOG.info(
+                "Assigned {} buckets to node {}: {}",
+                entry.getValue().size(),
+                entry.getKey(),
+                entry.getValue());
+          } catch (Exception e) {
+            LOG.error("Failed to save assignments for node {}", 
entry.getKey(), e);
+          }
+        }
+      } else {
+        // Update last update time for alive nodes
+        for (AmsServerInfo node : aliveNodes) {
+          assignStore.updateLastUpdateTime(node);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Error during bucket assignment", e);
+    }
+  }
+
+  /**
+   * Redistribute buckets incrementally to alive nodes using round-robin. This 
minimizes bucket
+   * migration by only redistributing buckets from offline nodes.
+   *
+   * @param aliveNodes List of alive nodes
+   * @param bucketsToRedistribute Buckets to redistribute (from offline nodes)
+   * @param currentAssignments Current assignments map (will be modified)
+   */
+  private void redistributeBucketsIncrementally(
+      List<AmsServerInfo> aliveNodes,
+      List<String> bucketsToRedistribute,
+      Map<AmsServerInfo, List<String>> currentAssignments) {
+    if (aliveNodes.isEmpty() || bucketsToRedistribute.isEmpty()) {
+      return;
+    }
+
+    // Distribute buckets using round-robin to minimize migration
+    int nodeIndex = 0;
+    for (String bucketId : bucketsToRedistribute) {
+      AmsServerInfo node = aliveNodes.get(nodeIndex % aliveNodes.size());
+      currentAssignments.get(node).add(bucketId);

Review Comment:
   ```
     Node A: 80 buckets
     Node B: 20 buckets
     Node C: offline (have 50 buckets pending send)
   
     run redistributeBucketsIncrementally:
     Node A: 80 + 17 = 97 buckets  ← round-robin once to give A
     Node B: 20 + 17 = 37 buckets
     Node C: 0 + 16 = 16 buckets   ← Offline node recovery
   ```
   
   
   
   This is use **round-robin** add aliveNodes,can we find less bucket node to 
add it?
   ```
   for (String bucketId : bucketsToRedistribute) {
       AmsServerInfo target = aliveNodes.stream()
           .min(Comparator.comparingInt(n -> currentAssignments.get(n).size()))
           .orElse(aliveNodes.get(0));
       currentAssignments.get(target).add(bucketId);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to