czy006 commented on code in PR #3922:
URL: https://github.com/apache/amoro/pull/3922#discussion_r2903425110
##########
amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java:
##########
@@ -240,6 +241,22 @@ public void startOptimizingService() throws Exception {
DefaultTableRuntimeFactory defaultRuntimeFactory = new
DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);
+ // In master-slave mode, create AmsAssignService for bucket assignment
+ if (IS_MASTER_SLAVE_MODE && haContainer != null) {
+ try {
+ // Create and start AmsAssignService for bucket assignment
+ // The factory will handle different HA types (ZK, database, etc.)
+ amsAssignService = new AmsAssignService(haContainer, serviceConfig);
+ amsAssignService.start();
+ LOG.info("AmsAssignService started for master-slave mode");
+ } catch (UnsupportedOperationException e) {
Review Comment:
We only need to determine whether it is in master-slave mode. Why is it
necessary to use exceptions for skipping here?
##########
amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service for assigning bucket IDs to AMS nodes in master-slave mode.
Periodically detects node
+ * changes and redistributes bucket IDs evenly.
+ */
+public class AmsAssignService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AmsAssignService.class);
+
+ private final ScheduledExecutorService assignScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("ams-assign-scheduler-%d")
+ .setDaemon(true)
+ .build());
+
+ private final HighAvailabilityContainer haContainer;
+ private final BucketAssignStore assignStore;
+ private final Configurations serviceConfig;
+ private final int bucketIdTotalCount;
+ private final long nodeOfflineTimeoutMs;
+ private final long assignIntervalSeconds;
+ private volatile boolean running = false;
+
+ boolean isRunning() {
+ return running;
+ }
+
+ void doAssignForTest() {
+ doAssign();
+ }
+
+ public AmsAssignService(HighAvailabilityContainer haContainer,
Configurations serviceConfig) {
+ this.haContainer = haContainer;
+ this.serviceConfig = serviceConfig;
+ this.bucketIdTotalCount =
serviceConfig.getInteger(AmoroManagementConf.BUCKET_ID_TOTAL_COUNT);
+ this.nodeOfflineTimeoutMs =
+ serviceConfig.get(AmoroManagementConf.NODE_OFFLINE_TIMEOUT).toMillis();
+ this.assignIntervalSeconds =
+ serviceConfig.get(AmoroManagementConf.ASSIGN_INTERVAL).getSeconds();
+ this.assignStore = BucketAssignStoreFactory.create(haContainer,
serviceConfig);
+ }
+
+ /**
+ * Start the assignment service. Only works in master-slave mode and when
current node is leader.
+ */
+ public void start() {
+ if (!serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)) {
+ LOG.info("Master-slave mode is not enabled, skip starting bucket
assignment service");
+ return;
+ }
+ if (running) {
+ LOG.warn("Bucket assignment service is already running");
+ return;
+ }
+ running = true;
+ assignScheduler.scheduleWithFixedDelay(
+ this::doAssign, 10, assignIntervalSeconds, TimeUnit.SECONDS);
+ LOG.info("Bucket assignment service started with interval: {} seconds",
assignIntervalSeconds);
+ }
+
+ /** Stop the assignment service. */
+ public void stop() {
+ if (!running) {
+ return;
+ }
+ running = false;
+ assignScheduler.shutdown();
+ try {
+ if (!assignScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ assignScheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ assignScheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ LOG.info("Bucket assignment service stopped");
+ }
+
+ private void doAssign() {
+ try {
+ if (!haContainer.hasLeadership()) {
+ LOG.debug("Current node is not leader, skip bucket assignment");
+ return;
+ }
+
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ if (aliveNodes.isEmpty()) {
+ LOG.debug("No alive nodes found, skip bucket assignment");
+ return;
+ }
+
+ Map<AmsServerInfo, List<String>> currentAssignments =
assignStore.getAllAssignments();
+
+ // Create a mapping from stored nodes (may have null restBindPort) to
alive nodes (complete
+ // info)
+ // Use host:thriftBindPort as the key for matching
+ Map<String, AmsServerInfo> aliveNodeMap = new java.util.HashMap<>();
+ for (AmsServerInfo node : aliveNodes) {
+ String key = getNodeKey(node);
+ aliveNodeMap.put(key, node);
+ }
+
+ // Normalize current assignments: map stored nodes to their
corresponding alive nodes
+ Map<AmsServerInfo, List<String>> normalizedAssignments = new
java.util.HashMap<>();
+ Set<AmsServerInfo> currentAssignedNodes = new HashSet<>();
+ for (Map.Entry<AmsServerInfo, List<String>> entry :
currentAssignments.entrySet()) {
+ AmsServerInfo storedNode = entry.getKey();
+ String nodeKey = getNodeKey(storedNode);
+ AmsServerInfo aliveNode = aliveNodeMap.get(nodeKey);
+ if (aliveNode != null) {
+ // Node is alive, use the complete node info from aliveNodes
+ normalizedAssignments.put(aliveNode, entry.getValue());
+ currentAssignedNodes.add(aliveNode);
+ } else {
+ // Node is not in alive list, keep the stored node info for offline
detection
+ normalizedAssignments.put(storedNode, entry.getValue());
+ currentAssignedNodes.add(storedNode);
+ }
+ }
+
+ Set<AmsServerInfo> aliveNodeSet = new HashSet<>(aliveNodes);
+
+ // Detect new nodes and offline nodes
+ Set<AmsServerInfo> newNodes = new HashSet<>(aliveNodeSet);
+ newNodes.removeAll(currentAssignedNodes);
+
+ Set<AmsServerInfo> offlineNodes = new HashSet<>();
+ for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+ String nodeKey = getNodeKey(storedNode);
+ if (!aliveNodeMap.containsKey(nodeKey)) {
+ offlineNodes.add(storedNode);
+ }
+ }
+
+ // Check for nodes that haven't updated for a long time
+ long currentTime = System.currentTimeMillis();
+ Set<String> aliveNodeKeys = new HashSet<>();
+ for (AmsServerInfo node : aliveNodes) {
+ aliveNodeKeys.add(getNodeKey(node));
+ }
+ for (AmsServerInfo node : currentAssignedNodes) {
+ String nodeKey = getNodeKey(node);
+ if (aliveNodeKeys.contains(nodeKey)) {
+ long lastUpdateTime = assignStore.getLastUpdateTime(node);
+ if (lastUpdateTime > 0 && (currentTime - lastUpdateTime) >
nodeOfflineTimeoutMs) {
+ // Find the stored node for this alive node to add to offlineNodes
+ for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+ if (getNodeKey(storedNode).equals(nodeKey)) {
+ offlineNodes.add(storedNode);
+ break;
+ }
+ }
+ LOG.warn(
+ "Node {} is considered offline due to timeout. Last update:
{}",
+ node,
+ lastUpdateTime);
+ }
+ }
+ }
+
+ boolean needReassign = !newNodes.isEmpty() || !offlineNodes.isEmpty();
+
+ if (needReassign) {
+ LOG.info(
+ "Detected node changes - New nodes: {}, Offline nodes: {},
Performing incremental reassignment...",
+ newNodes.size(),
+ offlineNodes.size());
+
+ // Step 1: Handle offline nodes - collect their buckets for
redistribution
+ List<String> bucketsToRedistribute = new ArrayList<>();
+ for (AmsServerInfo offlineNode : offlineNodes) {
+ try {
+ List<String> offlineBuckets = currentAssignments.get(offlineNode);
+ if (offlineBuckets != null && !offlineBuckets.isEmpty()) {
+ bucketsToRedistribute.addAll(offlineBuckets);
+ LOG.info(
+ "Collected {} buckets from offline node {} for
redistribution",
+ offlineBuckets.size(),
+ offlineNode);
+ }
+ assignStore.removeAssignments(offlineNode);
+ } catch (Exception e) {
+ LOG.warn("Failed to remove assignments for offline node {}",
offlineNode, e);
+ }
+ }
+
+ // Step 2: Calculate target assignment for balanced distribution
+ List<String> allBuckets = generateBucketIds();
+ int totalBuckets = allBuckets.size();
+ int totalAliveNodes = aliveNodes.size();
+ int targetBucketsPerNode = totalBuckets / totalAliveNodes;
+ int remainder = totalBuckets % totalAliveNodes;
+
+ // Step 3: Incremental reassignment
+ // Keep existing assignments for nodes that are still alive
+ Map<AmsServerInfo, List<String>> newAssignments = new
java.util.HashMap<>();
+ Set<String> offlineNodeKeys = new HashSet<>();
+ for (AmsServerInfo offlineNode : offlineNodes) {
+ offlineNodeKeys.add(getNodeKey(offlineNode));
+ }
+ for (AmsServerInfo node : aliveNodes) {
+ String nodeKey = getNodeKey(node);
+ if (!offlineNodeKeys.contains(nodeKey)) {
+ // Node is alive and not offline, check if it has existing
assignments
+ List<String> existingBuckets = normalizedAssignments.get(node);
+ if (existingBuckets != null && !existingBuckets.isEmpty()) {
+ // Keep existing buckets for alive nodes (not offline)
+ newAssignments.put(node, new ArrayList<>(existingBuckets));
+ } else {
+ // New node
+ newAssignments.put(node, new ArrayList<>());
+ }
+ } else {
+ // Node was offline, start with empty assignment
+ newAssignments.put(node, new ArrayList<>());
+ }
+ }
+
+ // Step 4: Redistribute buckets from offline nodes to alive nodes
+ if (!bucketsToRedistribute.isEmpty()) {
+ redistributeBucketsIncrementally(
+ aliveNodes, bucketsToRedistribute, newAssignments,
targetBucketsPerNode);
Review Comment:
targetBucketsPerNode is not use now,feature or next pr will be used?
##########
amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.properties.AmsHAProperties;
+import
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ZooKeeper-based implementation of BucketAssignStore. Stores bucket ID
assignments in ZooKeeper
+ * with the following structure:
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/assignments -
+ * bucket IDs
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/last-update-time - timestamp
+ */
+public class ZkBucketAssignStore implements BucketAssignStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ZkBucketAssignStore.class);
+ private static final String ASSIGNMENTS_SUFFIX = "/assignments";
+ private static final String LAST_UPDATE_TIME_SUFFIX = "/last-update-time";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final TypeReference<List<String>> LIST_STRING_TYPE =
+ new TypeReference<List<String>>() {};
+
+ private final CuratorFramework zkClient;
+ private final String assignmentsBasePath;
+
+ public ZkBucketAssignStore(CuratorFramework zkClient, String clusterName) {
+ this.zkClient = zkClient;
+ this.assignmentsBasePath =
AmsHAProperties.getBucketAssignmentsPath(clusterName);
+ try {
+ createPathIfNeeded(assignmentsBasePath);
+ } catch (Exception e) {
+ LOG.error("Failed to create bucket assignments path", e);
+ throw new RuntimeException("Failed to initialize ZkBucketAssignStore",
e);
+ }
+ }
+
+ @Override
+ public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
throws Exception {
+ String nodeKey = getNodeKey(nodeInfo);
+ String assignmentsPath = assignmentsBasePath + "/" + nodeKey +
ASSIGNMENTS_SUFFIX;
+ String assignmentsJson = JacksonUtil.toJSONString(bucketIds);
+ try {
+ if (zkClient.checkExists().forPath(assignmentsPath) != null) {
+ zkClient
+ .setData()
+ .forPath(assignmentsPath,
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+ } else {
+ zkClient
+ .create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(assignmentsPath,
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+ }
+ updateLastUpdateTime(nodeInfo);
+ LOG.debug("Saved bucket assignments for node {}: {}", nodeKey,
bucketIds);
+ } catch (Exception e) {
+ LOG.error("Failed to save bucket assignments for node {}", nodeKey, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public List<String> getAssignments(AmsServerInfo nodeInfo) throws Exception {
+ String nodeKey = getNodeKey(nodeInfo);
+ String assignmentsPath = assignmentsBasePath + "/" + nodeKey +
ASSIGNMENTS_SUFFIX;
+ try {
+ if (zkClient.checkExists().forPath(assignmentsPath) == null) {
+ return new ArrayList<>();
+ }
+ byte[] data = zkClient.getData().forPath(assignmentsPath);
+ if (data == null || data.length == 0) {
+ return new ArrayList<>();
+ }
+ String assignmentsJson = new String(data, StandardCharsets.UTF_8);
+ return OBJECT_MAPPER.readValue(assignmentsJson, LIST_STRING_TYPE);
+ } catch (KeeperException.NoNodeException e) {
+ return new ArrayList<>();
+ } catch (Exception e) {
+ LOG.error("Failed to get bucket assignments for node {}", nodeKey, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void removeAssignments(AmsServerInfo nodeInfo) throws Exception {
+ String nodeKey = getNodeKey(nodeInfo);
+ String nodePath = assignmentsBasePath + "/" + nodeKey;
+ try {
+ if (zkClient.checkExists().forPath(nodePath) != null) {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(nodePath);
+ LOG.debug("Removed bucket assignments for node {}", nodeKey);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Already deleted, ignore
+ } catch (Exception e) {
+ LOG.error("Failed to remove bucket assignments for node {}", nodeKey, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public Map<AmsServerInfo, List<String>> getAllAssignments() throws Exception
{
+ Map<AmsServerInfo, List<String>> allAssignments = new HashMap<>();
+ try {
+ if (zkClient.checkExists().forPath(assignmentsBasePath) == null) {
+ return allAssignments;
+ }
+ List<String> nodeKeys =
zkClient.getChildren().forPath(assignmentsBasePath);
+ for (String nodeKey : nodeKeys) {
+ try {
+ AmsServerInfo nodeInfo = parseNodeKey(nodeKey);
+ List<String> bucketIds = getAssignments(nodeInfo);
+ if (!bucketIds.isEmpty()) {
+ allAssignments.put(nodeInfo, bucketIds);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to parse node key or get assignments: {}", nodeKey,
e);
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Path doesn't exist, return empty map
+ } catch (Exception e) {
+ LOG.error("Failed to get all bucket assignments", e);
+ throw e;
+ }
+ return allAssignments;
+ }
+
+ @Override
+ public long getLastUpdateTime(AmsServerInfo nodeInfo) throws Exception {
+ String nodeKey = getNodeKey(nodeInfo);
+ String timePath = assignmentsBasePath + "/" + nodeKey +
LAST_UPDATE_TIME_SUFFIX;
+ try {
+ if (zkClient.checkExists().forPath(timePath) == null) {
+ return 0;
+ }
+ byte[] data = zkClient.getData().forPath(timePath);
+ if (data == null || data.length == 0) {
+ return 0;
+ }
+ return Long.parseLong(new String(data, StandardCharsets.UTF_8));
+ } catch (KeeperException.NoNodeException e) {
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed to get last update time for node {}", nodeKey, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws Exception {
+ String nodeKey = getNodeKey(nodeInfo);
+ String timePath = assignmentsBasePath + "/" + nodeKey +
LAST_UPDATE_TIME_SUFFIX;
+ long currentTime = System.currentTimeMillis();
+ String timeStr = String.valueOf(currentTime);
+ try {
+ if (zkClient.checkExists().forPath(timePath) != null) {
+ zkClient.setData().forPath(timePath,
timeStr.getBytes(StandardCharsets.UTF_8));
+ } else {
+ zkClient
+ .create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(timePath, timeStr.getBytes(StandardCharsets.UTF_8));
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to update last update time for node {}", nodeKey, e);
+ throw e;
+ }
+ }
+
+ private String getNodeKey(AmsServerInfo nodeInfo) {
+ return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+ }
+
+ private AmsServerInfo parseNodeKey(String nodeKey) {
+ String[] parts = nodeKey.split(":");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid node key format: " +
nodeKey);
+ }
+ AmsServerInfo nodeInfo = new AmsServerInfo();
+ nodeInfo.setHost(parts[0]);
+ nodeInfo.setThriftBindPort(Integer.parseInt(parts[1]));
Review Comment:
nodeInfo is miss restBindPort? equals have restBindPort
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]