This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new bb140417b Add a registration function for table allocation in
master-slave mode. (#3920)
bb140417b is described below
commit bb140417bdfd7de6401c9f660ea9c76eee884381
Author: can <[email protected]>
AuthorDate: Fri Mar 6 11:02:36 2026 +0800
Add a registration function for table allocation in master-slave mode.
(#3920)
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Replace zk with mocking. #3919
* [Subtask]: Replace zk with mocking. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Use a new configuration item to control whether master & slave
mode is enabled. #3845
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Replace zk with mocking. #3919
* [Subtask]: Replace zk with mocking. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Fixing conflicts after a forced push following a rebase.
* [Subtask]: change registAndElect to registerAndElect.
* [Subtask]: Revised based on CR’s comments
* [Subtask]: Revised based on CR’s comments
---------
Co-authored-by: wardli <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 2 +-
.../ha/DataBaseHighAvailabilityContainer.java | 77 ++-
.../amoro/server/ha/HighAvailabilityContainer.java | 20 +-
.../server/ha/NoopHighAvailabilityContainer.java | 14 +-
.../server/ha/ZkHighAvailabilityContainer.java | 107 +++-
.../server/persistence/mapper/HaLeaseMapper.java | 14 +
.../server/ha/TestZkHighAvailabilityContainer.java | 566 +++++++++++++++++++++
.../apache/amoro/properties/AmsHAProperties.java | 5 +
8 files changed, 797 insertions(+), 8 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 6fa3293e9..5d76e3983 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -171,7 +171,7 @@ public class AmoroServiceContainer {
}
public void registAndElect() throws Exception {
- haContainer.registAndElect();
+ haContainer.registerAndElect();
}
public enum HAState {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
index 8241e0802..c97951515 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -28,6 +28,8 @@ import org.apache.amoro.utils.JacksonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -135,6 +137,44 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
LOG.info("Became the follower of AMS (Database lease)");
}
+ @Override
+ public void registerAndElect() throws Exception {
+ boolean isMasterSlaveMode =
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
+ if (!isMasterSlaveMode) {
+ LOG.debug("Master-slave mode is not enabled, skip node registration");
+ return;
+ }
+ // In master-slave mode, register node to database by writing
OPTIMIZING_SERVICE info
+ // This is similar to ZK mode registering ephemeral nodes
+ long now = System.currentTimeMillis();
+ String optimizingInfoJson =
JacksonUtil.toJSONString(optimizingServiceServerInfo);
+ try {
+ doAsIgnoreError(
+ HaLeaseMapper.class,
+ mapper -> {
+ int updated =
+ mapper.updateServerInfo(
+ clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp,
optimizingInfoJson, now);
+ if (updated == 0) {
+ mapper.insertServerInfoIfAbsent(
+ clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp,
optimizingInfoJson, now);
+ }
+ });
+ LOG.info(
+ "Registered AMS node to database: nodeId={}, optimizingService={}",
+ nodeId,
+ optimizingServiceServerInfo);
+ } catch (Exception e) {
+ LOG.error("Failed to register node to database", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean hasLeadership() {
+ return isLeader.get();
+ }
+
/** Closes the heartbeat executor safely. */
@Override
public void close() {
@@ -147,9 +187,6 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
}
}
- @Override
- public void registAndElect() throws Exception {}
-
private class HeartbeatRunnable implements Runnable {
@Override
public void run() {
@@ -304,6 +341,40 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
}
}
+ @Override
+ public List<AmsServerInfo> getAliveNodes() {
+ List<AmsServerInfo> aliveNodes = new ArrayList<>();
+ if (!isLeader.get()) {
+ LOG.warn("Only leader node can get alive nodes list");
+ return aliveNodes;
+ }
+ try {
+ long currentTime = System.currentTimeMillis();
+ List<HaLeaseMeta> leases =
+ getAs(
+ HaLeaseMapper.class,
+ mapper -> mapper.selectLeasesByService(clusterName,
OPTIMIZING_SERVICE));
+ for (HaLeaseMeta lease : leases) {
+ // Only include nodes with valid (non-expired) leases
+ if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() >
currentTime) {
+ if (lease.getServerInfoJson() != null &&
!lease.getServerInfoJson().isEmpty()) {
+ try {
+ AmsServerInfo nodeInfo =
+ JacksonUtil.parseObject(lease.getServerInfoJson(),
AmsServerInfo.class);
+ aliveNodes.add(nodeInfo);
+ } catch (Exception e) {
+ LOG.warn("Failed to parse server info for node {}",
lease.getNodeId(), e);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to get alive nodes from database", e);
+ throw e;
+ }
+ return aliveNodes;
+ }
+
private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int
restBindPort) {
AmsServerInfo amsServerInfo = new AmsServerInfo();
amsServerInfo.setHost(host);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
index 041c1e469..7139bb679 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -18,6 +18,10 @@
package org.apache.amoro.server.ha;
+import org.apache.amoro.client.AmsServerInfo;
+
+import java.util.List;
+
/**
* Common interface for high availability (HA) containers.
*
@@ -49,5 +53,19 @@ public interface HighAvailabilityContainer {
*
* @throws Exception If registration fails or participation in the primary
election fails.
*/
- void registAndElect() throws Exception;
+ void registerAndElect() throws Exception;
+
+ /**
+ * Used in master-slave mode to obtain information about all currently
registered AMS nodes.
+ *
+ * @return List<AmsServerInfo>
+ */
+ List<AmsServerInfo> getAliveNodes();
+
+ /**
+ * Used to determine whether the current AMS node is the primary node.
+ *
+ * @return true if the current AMS node is the primary node, false otherwise
+ */
+ boolean hasLeadership();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
index ef55b9ac7..f5fd040af 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -18,9 +18,11 @@
package org.apache.amoro.server.ha;
+import org.apache.amoro.client.AmsServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
/** No-op HA container that never blocks and performs no leader election. */
@@ -48,5 +50,15 @@ public class NoopHighAvailabilityContainer implements
HighAvailabilityContainer
}
@Override
- public void registAndElect() throws Exception {}
+ public void registerAndElect() throws Exception {}
+
+ @Override
+ public List<AmsServerInfo> getAliveNodes() {
+ return List.of();
+ }
+
+ @Override
+ public boolean hasLeadership() {
+ return false;
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index de25d4901..7c070f3b6 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -46,6 +46,8 @@ import javax.security.auth.login.Configuration;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -55,13 +57,27 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
private final LeaderLatch leaderLatch;
private final CuratorFramework zkClient;
+
+ // Package-private accessors for testing
+ CuratorFramework getZkClient() {
+ return zkClient;
+ }
+
+ LeaderLatch getLeaderLatch() {
+ return leaderLatch;
+ }
+
private final String tableServiceMasterPath;
private final String optimizingServiceMasterPath;
+ private final String nodesPath;
private final AmsServerInfo tableServiceServerInfo;
private final AmsServerInfo optimizingServiceServerInfo;
+ private final boolean isMasterSlaveMode;
private volatile CountDownLatch followerLatch;
+ private String registeredNodePath;
public ZkHighAvailabilityContainer(Configurations serviceConfig) throws
Exception {
+ this.isMasterSlaveMode =
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
String zkServerAddress =
serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS);
int zkSessionTimeout =
@@ -71,6 +87,7 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
String haClusterName =
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
tableServiceMasterPath =
AmsHAProperties.getTableServiceMasterPath(haClusterName);
optimizingServiceMasterPath =
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName);
+ nodesPath = AmsHAProperties.getNodesPath(haClusterName);
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000,
3, 5000);
setupZookeeperAuth(serviceConfig);
this.zkClient =
@@ -83,6 +100,7 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
zkClient.start();
createPathIfNeeded(tableServiceMasterPath);
createPathIfNeeded(optimizingServiceMasterPath);
+ createPathIfNeeded(nodesPath);
String leaderPath = AmsHAProperties.getLeaderPath(haClusterName);
createPathIfNeeded(leaderPath);
leaderLatch = new LeaderLatch(zkClient, leaderPath);
@@ -103,8 +121,10 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
zkClient = null;
tableServiceMasterPath = null;
optimizingServiceMasterPath = null;
+ nodesPath = null;
tableServiceServerInfo = null;
optimizingServiceServerInfo = null;
+ registeredNodePath = null;
// block follower latch forever when ha is disabled
followerLatch = new CountDownLatch(1);
}
@@ -141,8 +161,25 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
}
@Override
- public void registAndElect() throws Exception {
- // TODO Here you can register for AMS and participate in the election.
+ public void registerAndElect() throws Exception {
+ if (!isMasterSlaveMode) {
+ LOG.debug("Master-slave mode is not enabled, skip node registration");
+ return;
+ }
+ if (zkClient == null || nodesPath == null) {
+ LOG.warn("HA is not enabled, skip node registration");
+ return;
+ }
+ // Register node to ZK using ephemeral node
+ // The node will be automatically deleted when the session expires
+ String nodeInfo = JacksonUtil.toJSONString(optimizingServiceServerInfo);
+ registeredNodePath =
+ zkClient
+ .create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+ .forPath(nodesPath + "/node-",
nodeInfo.getBytes(StandardCharsets.UTF_8));
+ LOG.info("Registered AMS node to ZK: {}", registeredNodePath);
}
@Override
@@ -158,6 +195,18 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
public void close() {
if (leaderLatch != null) {
try {
+ // Unregister node from ZK
+ if (registeredNodePath != null) {
+ try {
+ zkClient.delete().forPath(registeredNodePath);
+ LOG.info("Unregistered AMS node from ZK: {}", registeredNodePath);
+ } catch (KeeperException.NoNodeException e) {
+ // Node already deleted, ignore
+ LOG.debug("Node {} already deleted", registeredNodePath);
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister node from ZK: {}",
registeredNodePath, e);
+ }
+ }
this.leaderLatch.close();
this.zkClient.close();
} catch (IOException e) {
@@ -192,6 +241,60 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
return amsServerInfo;
}
+ /**
+ * Get list of alive nodes. Only the leader node can call this method.
+ *
+ * @return List of alive node information
+ */
+ public List<AmsServerInfo> getAliveNodes() {
+ List<AmsServerInfo> aliveNodes = new ArrayList<>();
+ if (!isMasterSlaveMode) {
+ LOG.debug("Master-slave mode is not enabled, return empty node list");
+ return aliveNodes;
+ }
+ if (zkClient == null || nodesPath == null) {
+ LOG.warn("HA is not enabled, return empty node list");
+ return aliveNodes;
+ }
+ if (!leaderLatch.hasLeadership()) {
+ LOG.warn("Only leader node can get alive nodes list");
+ return aliveNodes;
+ }
+ try {
+ List<String> nodePaths = zkClient.getChildren().forPath(nodesPath);
+ for (String nodePath : nodePaths) {
+ try {
+ String fullPath = nodesPath + "/" + nodePath;
+ byte[] data = zkClient.getData().forPath(fullPath);
+ if (data != null && data.length > 0) {
+ String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
+ AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson,
AmsServerInfo.class);
+ aliveNodes.add(nodeInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get node info for path: {}", nodePath, e);
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ LOG.debug("Nodes path {} does not exist", nodesPath);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return aliveNodes;
+ }
+
+ /**
+ * Check if current node is the leader.
+ *
+ * @return true if current node is the leader, false otherwise
+ */
+ public boolean hasLeadership() {
+ if (leaderLatch == null) {
+ return false;
+ }
+ return leaderLatch.hasLeadership();
+ }
+
private void createPathIfNeeded(String path) throws Exception {
try {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
index c3ce95d74..b7f772373 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
@@ -107,6 +107,20 @@ public interface HaLeaseMapper {
HaLeaseMeta selectLease(
@Param("clusterName") String clusterName, @Param("serviceName") String
serviceName);
+ /**
+ * Select all leases for cluster and service.
+ *
+ * @param clusterName cluster name
+ * @param serviceName service name
+ * @return list of lease rows
+ */
+ @Select(
+ "SELECT cluster_name, service_name, node_id, node_ip, server_info_json,
lease_expire_ts, version, updated_at "
+ + "FROM ha_lease WHERE cluster_name = #{clusterName} AND
service_name = #{serviceName}")
+ @ResultMap("HaLeaseMetaMap")
+ List<HaLeaseMeta> selectLeasesByService(
+ @Param("clusterName") String clusterName, @Param("serviceName") String
serviceName);
+
/**
* Select current lease for cluster and service.
*
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java
new file mode 100644
index 000000000..3f9433132
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.AmoroManagementConf;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.apache.amoro.utils.JacksonUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for HighAvailabilityContainer using mocked ZK to avoid connection
issues. */
+public class TestZkHighAvailabilityContainer {
+
+ private Configurations serviceConfig;
+ private HighAvailabilityContainer haContainer;
+ private MockZkState mockZkState;
+ private CuratorFramework mockZkClient;
+ private LeaderLatch mockLeaderLatch;
+
+ @Before
+ public void setUp() throws Exception {
+ mockZkState = new MockZkState();
+ mockZkClient = createMockZkClient();
+ mockLeaderLatch = createMockLeaderLatch();
+
+ // Create test configuration
+ serviceConfig = new Configurations();
+ serviceConfig.setString(AmoroManagementConf.SERVER_EXPOSE_HOST,
"127.0.0.1");
+
serviceConfig.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT,
1260);
+
serviceConfig.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
1261);
+ serviceConfig.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, 1630);
+ serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+ serviceConfig.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS,
"127.0.0.1:2181");
+ serviceConfig.setString(AmoroManagementConf.HA_CLUSTER_NAME,
"test-cluster");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (haContainer != null) {
+ haContainer.close();
+ }
+ mockZkState.clear();
+ }
+
+ @Test
+ public void testRegisterAndElectWithoutMasterSlaveMode() throws Exception {
+ // Test that node registration is skipped when master-slave mode is
disabled
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+ haContainer = createContainerWithMockZk();
+
+ // Should not throw exception and should not register node
+ haContainer.registerAndElect();
+
+ // Verify no node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> children = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals(
+ "No nodes should be registered when master-slave mode is disabled", 0,
children.size());
+ }
+
+ @Test
+ public void testRegisterAndElectWithMasterSlaveMode() throws Exception {
+ // Test that node registration works when master-slave mode is enabled
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Verify node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> children = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("One node should be registered", 1, children.size());
+
+ // Verify node data
+ String nodePath = nodesPath + "/" + children.get(0);
+ byte[] data = mockZkState.getData(nodePath);
+ Assert.assertNotNull("Node data should not be null", data);
+ Assert.assertTrue("Node data should not be empty", data.length > 0);
+
+ // Verify node info
+ String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
+ AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson,
AmsServerInfo.class);
+ Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+ Assert.assertEquals(
+ "Thrift port should match", Integer.valueOf(1261),
nodeInfo.getThriftBindPort());
+ }
+
+ @Test
+ public void testGetAliveNodesWithoutMasterSlaveMode() throws Exception {
+ // Test that getAliveNodes returns empty list when master-slave mode is
disabled
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+ haContainer = createContainerWithMockZk();
+
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals(
+ "Alive nodes list should be empty when master-slave mode is disabled",
+ 0,
+ aliveNodes.size());
+ }
+
+ @Test
+ public void testGetAliveNodesWhenNotLeader() throws Exception {
+ // Test that getAliveNodes returns empty list when not leader
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(false); // Not leader
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Since we're not the leader, should return empty list
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals("Alive nodes list should be empty when not leader", 0,
aliveNodes.size());
+ }
+
+ @Test
+ public void testGetAliveNodesAsLeader() throws Exception {
+ // Test that getAliveNodes returns nodes when leader
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Verify we are leader
+ Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+
+ // Get alive nodes
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals("Should have one alive node", 1, aliveNodes.size());
+
+ // Verify node info
+ AmsServerInfo nodeInfo = aliveNodes.get(0);
+ Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+ Assert.assertEquals(
+ "Thrift port should match", Integer.valueOf(1261),
nodeInfo.getThriftBindPort());
+ Assert.assertEquals(
+ "HTTP port should match", Integer.valueOf(1630),
nodeInfo.getRestBindPort());
+ }
+
+ @Test
+ public void testGetAliveNodesWithMultipleNodes() throws Exception {
+ // Test that getAliveNodes returns all registered nodes
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+ haContainer = createContainerWithMockZk();
+
+ // Register first node
+ haContainer.registerAndElect();
+
+ // Verify first node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> childrenAfterFirst = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("First node should be registered", 1,
childrenAfterFirst.size());
+
+ // Register second node manually in mock state
+ // Use createNode with sequential path to get the correct sequence number
+ AmsServerInfo nodeInfo2 = new AmsServerInfo();
+ nodeInfo2.setHost("127.0.0.2");
+ nodeInfo2.setThriftBindPort(1262);
+ nodeInfo2.setRestBindPort(1631);
+ String nodeInfo2Json = JacksonUtil.toJSONString(nodeInfo2);
+ // Use sequential path ending with "-" to let createNode generate the
sequence number
+ // This ensures the second node gets the correct sequence number
(0000000001)
+ mockZkState.createNode(nodesPath + "/node-",
nodeInfo2Json.getBytes(StandardCharsets.UTF_8));
+
+ // Get alive nodes
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals("Should have two alive nodes", 2, aliveNodes.size());
+ }
+
+ @Test
+ public void testCloseUnregistersNode() throws Exception {
+ // Test that close() unregisters the node
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Verify node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> children = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("One node should be registered", 1, children.size());
+
+ // Close container
+ haContainer.close();
+ haContainer = null;
+
+ // Verify node was unregistered
+ List<String> childrenAfterClose = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("No nodes should be registered after close", 0,
childrenAfterClose.size());
+ }
+
+ @Test
+ public void testHasLeadership() throws Exception {
+ // Test hasLeadership() method
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(false); // Not leader initially
+ haContainer = createContainerWithMockZk();
+
+ // Initially should not be leader
+ Assert.assertFalse("Should not be leader initially",
haContainer.hasLeadership());
+
+ // Change to leader
+ mockLeaderLatch = createMockLeaderLatch(true);
+ haContainer = createContainerWithMockZk();
+
+ // Should be leader now
+ Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+ }
+
+ @Test
+ public void testRegisterAndElectWithoutHAEnabled() throws Exception {
+ // Test that registAndElect skips when HA is not enabled
+ serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ haContainer = new ZkHighAvailabilityContainer(serviceConfig);
+
+ // Should not throw exception
+ haContainer.registerAndElect();
+ }
+
+ /** Create HighAvailabilityContainer with mocked ZK components using
reflection. */
+ private HighAvailabilityContainer createContainerWithMockZk() throws
Exception {
+ // Create container without ZK connection to avoid any connection attempts
+ HighAvailabilityContainer container = createContainerWithoutZk();
+
+ // Inject mock ZK client and leader latch (fields are on
ZkHighAvailabilityContainer)
+ java.lang.reflect.Field zkClientField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("zkClient");
+ zkClientField.setAccessible(true);
+ zkClientField.set(container, mockZkClient);
+
+ java.lang.reflect.Field leaderLatchField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("leaderLatch");
+ leaderLatchField.setAccessible(true);
+ leaderLatchField.set(container, mockLeaderLatch);
+
+ // Note: We don't need to create the paths themselves as nodes in ZK
+ // ZK paths are logical containers, not actual nodes
+ // The createPathIfNeeded() calls will be handled by the mock when needed
+
+ return container;
+ }
+
+ /**
+ * Create a HighAvailabilityContainer without initializing ZK connection.
This is used when we
+ * want to completely avoid ZK connection attempts.
+ *
+ * <p>Uses ZkHighAvailabilityContainer (which has the constructor and
fields);
+ * HighAvailabilityContainer is an interface without constructors or
instance fields.
+ */
+ private HighAvailabilityContainer createContainerWithoutZk() throws
Exception {
+ // ZkHighAvailabilityContainer has constructor (Configurations);
HighAvailabilityContainer is an
+ // interface
+ java.lang.reflect.Constructor<ZkHighAvailabilityContainer> constructor =
+
ZkHighAvailabilityContainer.class.getDeclaredConstructor(Configurations.class);
+
+ // Create a minimal config that disables HA to avoid ZK connection
+ Configurations tempConfig = new Configurations(serviceConfig);
+ tempConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+
+ HighAvailabilityContainer container = constructor.newInstance(tempConfig);
+
+ // Now set all required fields using reflection (fields are on
ZkHighAvailabilityContainer)
+ java.lang.reflect.Field isMasterSlaveModeField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("isMasterSlaveMode");
+ isMasterSlaveModeField.setAccessible(true);
+ isMasterSlaveModeField.set(
+ container,
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE));
+
+ if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
+ String haClusterName =
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+ java.lang.reflect.Field tableServiceMasterPathField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceMasterPath");
+ tableServiceMasterPathField.setAccessible(true);
+ tableServiceMasterPathField.set(
+ container, AmsHAProperties.getTableServiceMasterPath(haClusterName));
+
+ java.lang.reflect.Field optimizingServiceMasterPathField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceMasterPath");
+ optimizingServiceMasterPathField.setAccessible(true);
+ optimizingServiceMasterPathField.set(
+ container,
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName));
+
+ java.lang.reflect.Field nodesPathField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("nodesPath");
+ nodesPathField.setAccessible(true);
+ nodesPathField.set(container,
AmsHAProperties.getNodesPath(haClusterName));
+
+ java.lang.reflect.Field tableServiceServerInfoField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceServerInfo");
+ tableServiceServerInfoField.setAccessible(true);
+ AmsServerInfo tableServiceServerInfo =
+ buildServerInfo(
+ serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+
serviceConfig.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT),
+ serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+ tableServiceServerInfoField.set(container, tableServiceServerInfo);
+
+ java.lang.reflect.Field optimizingServiceServerInfoField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceServerInfo");
+ optimizingServiceServerInfoField.setAccessible(true);
+ AmsServerInfo optimizingServiceServerInfo =
+ buildServerInfo(
+ serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT),
+ serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+ optimizingServiceServerInfoField.set(container,
optimizingServiceServerInfo);
+ }
+
+ return container;
+ }
+
+ /** Helper method to build AmsServerInfo (copied from
HighAvailabilityContainer). */
+ private AmsServerInfo buildServerInfo(String host, Integer thriftPort,
Integer httpPort) {
+ AmsServerInfo serverInfo = new AmsServerInfo();
+ serverInfo.setHost(host);
+ serverInfo.setThriftBindPort(thriftPort);
+ serverInfo.setRestBindPort(httpPort);
+ return serverInfo;
+ }
+
+ /** Create a mock CuratorFramework that uses MockZkState for storage. */
+ @SuppressWarnings("unchecked")
+ private CuratorFramework createMockZkClient() throws Exception {
+ CuratorFramework mockClient = mock(CuratorFramework.class);
+
+ // Mock getChildren() - create a chain of mocks
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+ getChildrenBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .GetChildrenBuilder.class);
+ when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+ when(getChildrenBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getChildren(path);
+ });
+
+ // Mock getData()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ getDataBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ .class);
+ when(mockClient.getData()).thenReturn(getDataBuilder);
+ when(getDataBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getData(path);
+ });
+
+ // Mock create() - manually create the entire fluent API chain to ensure
consistency
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
createBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+ @SuppressWarnings("unchecked")
+ org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable<
+ String>
+ pathAndBytesable =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable.class);
+
+ when(mockClient.create()).thenReturn(createBuilder);
+
+ // Mock the chain: creatingParentsIfNeeded() -> withMode() -> forPath()
+ // Use the same mock object for the entire chain
+ when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+ // Mock forPath(path, data) - used by registAndElect()
+ when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ byte[] data = invocation.getArgument(1);
+ return mockZkState.createNode(path, data);
+ });
+
+ // Mock forPath(path) - used by createPathIfNeeded()
+ // Note: createPathIfNeeded() creates paths without data, but we still
need to store them
+ // so that getChildren() can work correctly
+ when(pathAndBytesable.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ // Create the path as an empty node (this simulates ZK path
creation)
+ // In real ZK, paths are logical containers, but we need to
store them
+ // to make getChildren() work correctly
+ if (mockZkState.exists(path) == null) {
+ mockZkState.createNode(path, new byte[0]);
+ }
+ return null;
+ });
+
+ // Mock delete()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
deleteBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+ when(mockClient.delete()).thenReturn(deleteBuilder);
+ doAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ mockZkState.deleteNode(path);
+ return null;
+ })
+ .when(deleteBuilder)
+ .forPath(anyString());
+
+ // Mock checkExists()
+ @SuppressWarnings("unchecked")
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ checkExistsBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ .class);
+ when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+ when(checkExistsBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.exists(path);
+ });
+
+ // Mock start() and close()
+ doAnswer(invocation -> null).when(mockClient).start();
+ doAnswer(invocation -> null).when(mockClient).close();
+
+ return mockClient;
+ }
+
+ /** Create a mock LeaderLatch. */
+ private LeaderLatch createMockLeaderLatch() throws Exception {
+ return createMockLeaderLatch(true);
+ }
+
+ /** Create a mock LeaderLatch with specified leadership status. */
+ private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws
Exception {
+ LeaderLatch mockLatch = mock(LeaderLatch.class);
+ when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+ doAnswer(invocation -> null).when(mockLatch).addListener(any());
+ doAnswer(invocation -> null).when(mockLatch).start();
+ doAnswer(invocation -> null).when(mockLatch).close();
+ // Mock await() - it throws IOException and InterruptedException
+ doAnswer(
+ invocation -> {
+ // Mock implementation - doesn't actually wait
+ return null;
+ })
+ .when(mockLatch)
+ .await();
+ return mockLatch;
+ }
+
+ /** In-memory ZK state simulator. */
+ private static class MockZkState {
+ private final Map<String, byte[]> nodes = new HashMap<>();
+ private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+ public List<String> getChildren(String path) throws KeeperException {
+ List<String> children = new ArrayList<>();
+ String prefix = path.endsWith("/") ? path : path + "/";
+ for (String nodePath : nodes.keySet()) {
+ // Only include direct children (not the path itself, and not nested
paths)
+ if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+ String relativePath = nodePath.substring(prefix.length());
+ // Only add direct children (no additional slashes)
+ // This means the path should be exactly: prefix + relativePath
+ if (!relativePath.contains("/")) {
+ children.add(relativePath);
+ }
+ }
+ }
+ // Sort to ensure consistent ordering
+ children.sort(String::compareTo);
+ return children;
+ }
+
+ public byte[] getData(String path) throws KeeperException {
+ byte[] data = nodes.get(path);
+ if (data == null) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ return data;
+ }
+
+ public String createNode(String path, byte[] data) {
+ // Handle sequential nodes
+ if (path.endsWith("-")) {
+ int seq = sequenceCounter.incrementAndGet();
+ path = path + String.format("%010d", seq);
+ }
+ nodes.put(path, data);
+ return path;
+ }
+
+ public void deleteNode(String path) throws KeeperException {
+ if (!nodes.containsKey(path)) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ nodes.remove(path);
+ }
+
+ public Stat exists(String path) {
+ return nodes.containsKey(path) ? new Stat() : null;
+ }
+
+ public void clear() {
+ nodes.clear();
+ sequenceCounter.set(0);
+ }
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
index e794b520d..08b9ef04a 100644
---
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
+++
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
@@ -25,6 +25,7 @@ public class AmsHAProperties {
private static final String LEADER_PATH = "/leader";
private static final String TABLE_SERVICE_MASTER_PATH = "/master";
private static final String OPTIMIZING_SERVICE_MASTER_PATH =
"/optimizing-service-master";
+ private static final String NODES_PATH = "/nodes";
private static final String NAMESPACE_DEFAULT = "default";
private static String getBasePath(String namespace) {
@@ -45,4 +46,8 @@ public class AmsHAProperties {
public static String getLeaderPath(String namespace) {
return getBasePath(namespace) + LEADER_PATH;
}
+
+ public static String getNodesPath(String namespace) {
+ return getBasePath(namespace) + NODES_PATH;
+ }
}