This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new bb140417b Add a registration function for table allocation in 
master-slave mode.  (#3920)
bb140417b is described below

commit bb140417bdfd7de6401c9f660ea9c76eee884381
Author: can <[email protected]>
AuthorDate: Fri Mar 6 11:02:36 2026 +0800

    Add a registration function for table allocation in master-slave mode.  
(#3920)
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Replace zk with mocking. #3919
    
    * [Subtask]: Replace zk with mocking. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Use a new configuration item to control whether master & slave 
mode is enabled. #3845
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Replace zk with mocking. #3919
    
    * [Subtask]: Replace zk with mocking. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Fixing conflicts after a forced push following a rebase.
    
    * [Subtask]: change registAndElect to registerAndElect.
    
    * [Subtask]: Revised based on CR’s comments
    
    * [Subtask]: Revised based on CR’s comments
    
    ---------
    
    Co-authored-by: wardli <[email protected]>
---
 .../apache/amoro/server/AmoroServiceContainer.java |   2 +-
 .../ha/DataBaseHighAvailabilityContainer.java      |  77 ++-
 .../amoro/server/ha/HighAvailabilityContainer.java |  20 +-
 .../server/ha/NoopHighAvailabilityContainer.java   |  14 +-
 .../server/ha/ZkHighAvailabilityContainer.java     | 107 +++-
 .../server/persistence/mapper/HaLeaseMapper.java   |  14 +
 .../server/ha/TestZkHighAvailabilityContainer.java | 566 +++++++++++++++++++++
 .../apache/amoro/properties/AmsHAProperties.java   |   5 +
 8 files changed, 797 insertions(+), 8 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 6fa3293e9..5d76e3983 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -171,7 +171,7 @@ public class AmoroServiceContainer {
   }
 
   public void registAndElect() throws Exception {
-    haContainer.registAndElect();
+    haContainer.registerAndElect();
   }
 
   public enum HAState {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
index 8241e0802..c97951515 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -28,6 +28,8 @@ import org.apache.amoro.utils.JacksonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -135,6 +137,44 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
     LOG.info("Became the follower of AMS (Database lease)");
   }
 
+  @Override
+  public void registerAndElect() throws Exception {
+    boolean isMasterSlaveMode = 
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
+    if (!isMasterSlaveMode) {
+      LOG.debug("Master-slave mode is not enabled, skip node registration");
+      return;
+    }
+    // In master-slave mode, register node to database by writing 
OPTIMIZING_SERVICE info
+    // This is similar to ZK mode registering ephemeral nodes
+    long now = System.currentTimeMillis();
+    String optimizingInfoJson = 
JacksonUtil.toJSONString(optimizingServiceServerInfo);
+    try {
+      doAsIgnoreError(
+          HaLeaseMapper.class,
+          mapper -> {
+            int updated =
+                mapper.updateServerInfo(
+                    clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, 
optimizingInfoJson, now);
+            if (updated == 0) {
+              mapper.insertServerInfoIfAbsent(
+                  clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, 
optimizingInfoJson, now);
+            }
+          });
+      LOG.info(
+          "Registered AMS node to database: nodeId={}, optimizingService={}",
+          nodeId,
+          optimizingServiceServerInfo);
+    } catch (Exception e) {
+      LOG.error("Failed to register node to database", e);
+      throw e;
+    }
+  }
+
+  @Override
+  public boolean hasLeadership() {
+    return isLeader.get();
+  }
+
   /** Closes the heartbeat executor safely. */
   @Override
   public void close() {
@@ -147,9 +187,6 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
     }
   }
 
-  @Override
-  public void registAndElect() throws Exception {}
-
   private class HeartbeatRunnable implements Runnable {
     @Override
     public void run() {
@@ -304,6 +341,40 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
     }
   }
 
+  @Override
+  public List<AmsServerInfo> getAliveNodes() {
+    List<AmsServerInfo> aliveNodes = new ArrayList<>();
+    if (!isLeader.get()) {
+      LOG.warn("Only leader node can get alive nodes list");
+      return aliveNodes;
+    }
+    try {
+      long currentTime = System.currentTimeMillis();
+      List<HaLeaseMeta> leases =
+          getAs(
+              HaLeaseMapper.class,
+              mapper -> mapper.selectLeasesByService(clusterName, 
OPTIMIZING_SERVICE));
+      for (HaLeaseMeta lease : leases) {
+        // Only include nodes with valid (non-expired) leases
+        if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > 
currentTime) {
+          if (lease.getServerInfoJson() != null && 
!lease.getServerInfoJson().isEmpty()) {
+            try {
+              AmsServerInfo nodeInfo =
+                  JacksonUtil.parseObject(lease.getServerInfoJson(), 
AmsServerInfo.class);
+              aliveNodes.add(nodeInfo);
+            } catch (Exception e) {
+              LOG.warn("Failed to parse server info for node {}", 
lease.getNodeId(), e);
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to get alive nodes from database", e);
+      throw e;
+    }
+    return aliveNodes;
+  }
+
   private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int 
restBindPort) {
     AmsServerInfo amsServerInfo = new AmsServerInfo();
     amsServerInfo.setHost(host);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
index 041c1e469..7139bb679 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -18,6 +18,10 @@
 
 package org.apache.amoro.server.ha;
 
+import org.apache.amoro.client.AmsServerInfo;
+
+import java.util.List;
+
 /**
  * Common interface for high availability (HA) containers.
  *
@@ -49,5 +53,19 @@ public interface HighAvailabilityContainer {
    *
    * @throws Exception If registration fails or participation in the primary 
election fails.
    */
-  void registAndElect() throws Exception;
+  void registerAndElect() throws Exception;
+
+  /**
+   * Used in master-slave mode to obtain information about all currently 
registered AMS nodes.
+   *
+   * @return List<AmsServerInfo>
+   */
+  List<AmsServerInfo> getAliveNodes();
+
+  /**
+   * Used to determine whether the current AMS node is the primary node.
+   *
+   * @return true if the current AMS node is the primary node, false otherwise
+   */
+  boolean hasLeadership();
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
index ef55b9ac7..f5fd040af 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -18,9 +18,11 @@
 
 package org.apache.amoro.server.ha;
 
+import org.apache.amoro.client.AmsServerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 /** No-op HA container that never blocks and performs no leader election. */
@@ -48,5 +50,15 @@ public class NoopHighAvailabilityContainer implements 
HighAvailabilityContainer
   }
 
   @Override
-  public void registAndElect() throws Exception {}
+  public void registerAndElect() throws Exception {}
+
+  @Override
+  public List<AmsServerInfo> getAliveNodes() {
+    return List.of();
+  }
+
+  @Override
+  public boolean hasLeadership() {
+    return false;
+  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index de25d4901..7c070f3b6 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -46,6 +46,8 @@ import javax.security.auth.login.Configuration;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -55,13 +57,27 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
 
   private final LeaderLatch leaderLatch;
   private final CuratorFramework zkClient;
+
+  // Package-private accessors for testing
+  CuratorFramework getZkClient() {
+    return zkClient;
+  }
+
+  LeaderLatch getLeaderLatch() {
+    return leaderLatch;
+  }
+
   private final String tableServiceMasterPath;
   private final String optimizingServiceMasterPath;
+  private final String nodesPath;
   private final AmsServerInfo tableServiceServerInfo;
   private final AmsServerInfo optimizingServiceServerInfo;
+  private final boolean isMasterSlaveMode;
   private volatile CountDownLatch followerLatch;
+  private String registeredNodePath;
 
   public ZkHighAvailabilityContainer(Configurations serviceConfig) throws 
Exception {
+    this.isMasterSlaveMode = 
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
     if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
       String zkServerAddress = 
serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS);
       int zkSessionTimeout =
@@ -71,6 +87,7 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
       String haClusterName = 
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
       tableServiceMasterPath = 
AmsHAProperties.getTableServiceMasterPath(haClusterName);
       optimizingServiceMasterPath = 
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName);
+      nodesPath = AmsHAProperties.getNodesPath(haClusterName);
       ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 
3, 5000);
       setupZookeeperAuth(serviceConfig);
       this.zkClient =
@@ -83,6 +100,7 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
       zkClient.start();
       createPathIfNeeded(tableServiceMasterPath);
       createPathIfNeeded(optimizingServiceMasterPath);
+      createPathIfNeeded(nodesPath);
       String leaderPath = AmsHAProperties.getLeaderPath(haClusterName);
       createPathIfNeeded(leaderPath);
       leaderLatch = new LeaderLatch(zkClient, leaderPath);
@@ -103,8 +121,10 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
       zkClient = null;
       tableServiceMasterPath = null;
       optimizingServiceMasterPath = null;
+      nodesPath = null;
       tableServiceServerInfo = null;
       optimizingServiceServerInfo = null;
+      registeredNodePath = null;
       // block follower latch forever when ha is disabled
       followerLatch = new CountDownLatch(1);
     }
@@ -141,8 +161,25 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
   }
 
   @Override
-  public void registAndElect() throws Exception {
-    // TODO Here you can register for AMS and participate in the election.
+  public void registerAndElect() throws Exception {
+    if (!isMasterSlaveMode) {
+      LOG.debug("Master-slave mode is not enabled, skip node registration");
+      return;
+    }
+    if (zkClient == null || nodesPath == null) {
+      LOG.warn("HA is not enabled, skip node registration");
+      return;
+    }
+    // Register node to ZK using ephemeral node
+    // The node will be automatically deleted when the session expires
+    String nodeInfo = JacksonUtil.toJSONString(optimizingServiceServerInfo);
+    registeredNodePath =
+        zkClient
+            .create()
+            .creatingParentsIfNeeded()
+            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+            .forPath(nodesPath + "/node-", 
nodeInfo.getBytes(StandardCharsets.UTF_8));
+    LOG.info("Registered AMS node to ZK: {}", registeredNodePath);
   }
 
   @Override
@@ -158,6 +195,18 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
   public void close() {
     if (leaderLatch != null) {
       try {
+        // Unregister node from ZK
+        if (registeredNodePath != null) {
+          try {
+            zkClient.delete().forPath(registeredNodePath);
+            LOG.info("Unregistered AMS node from ZK: {}", registeredNodePath);
+          } catch (KeeperException.NoNodeException e) {
+            // Node already deleted, ignore
+            LOG.debug("Node {} already deleted", registeredNodePath);
+          } catch (Exception e) {
+            LOG.warn("Failed to unregister node from ZK: {}", 
registeredNodePath, e);
+          }
+        }
         this.leaderLatch.close();
         this.zkClient.close();
       } catch (IOException e) {
@@ -192,6 +241,60 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
     return amsServerInfo;
   }
 
+  /**
+   * Get list of alive nodes. Only the leader node can call this method.
+   *
+   * @return List of alive node information
+   */
+  public List<AmsServerInfo> getAliveNodes() {
+    List<AmsServerInfo> aliveNodes = new ArrayList<>();
+    if (!isMasterSlaveMode) {
+      LOG.debug("Master-slave mode is not enabled, return empty node list");
+      return aliveNodes;
+    }
+    if (zkClient == null || nodesPath == null) {
+      LOG.warn("HA is not enabled, return empty node list");
+      return aliveNodes;
+    }
+    if (!leaderLatch.hasLeadership()) {
+      LOG.warn("Only leader node can get alive nodes list");
+      return aliveNodes;
+    }
+    try {
+      List<String> nodePaths = zkClient.getChildren().forPath(nodesPath);
+      for (String nodePath : nodePaths) {
+        try {
+          String fullPath = nodesPath + "/" + nodePath;
+          byte[] data = zkClient.getData().forPath(fullPath);
+          if (data != null && data.length > 0) {
+            String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
+            AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, 
AmsServerInfo.class);
+            aliveNodes.add(nodeInfo);
+          }
+        } catch (Exception e) {
+          LOG.warn("Failed to get node info for path: {}", nodePath, e);
+        }
+      }
+    } catch (KeeperException.NoNodeException e) {
+      LOG.debug("Nodes path {} does not exist", nodesPath);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return aliveNodes;
+  }
+
+  /**
+   * Check if current node is the leader.
+   *
+   * @return true if current node is the leader, false otherwise
+   */
+  public boolean hasLeadership() {
+    if (leaderLatch == null) {
+      return false;
+    }
+    return leaderLatch.hasLeadership();
+  }
+
   private void createPathIfNeeded(String path) throws Exception {
     try {
       
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
index c3ce95d74..b7f772373 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
@@ -107,6 +107,20 @@ public interface HaLeaseMapper {
   HaLeaseMeta selectLease(
       @Param("clusterName") String clusterName, @Param("serviceName") String 
serviceName);
 
+  /**
+   * Select all leases for cluster and service.
+   *
+   * @param clusterName cluster name
+   * @param serviceName service name
+   * @return list of lease rows
+   */
+  @Select(
+      "SELECT cluster_name, service_name, node_id, node_ip, server_info_json, 
lease_expire_ts, version, updated_at "
+          + "FROM ha_lease WHERE cluster_name = #{clusterName} AND 
service_name = #{serviceName}")
+  @ResultMap("HaLeaseMetaMap")
+  List<HaLeaseMeta> selectLeasesByService(
+      @Param("clusterName") String clusterName, @Param("serviceName") String 
serviceName);
+
   /**
    * Select current lease for cluster and service.
    *
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java
new file mode 100644
index 000000000..3f9433132
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/TestZkHighAvailabilityContainer.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.AmoroManagementConf;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.apache.amoro.utils.JacksonUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for HighAvailabilityContainer using mocked ZK to avoid connection 
issues. */
+public class TestZkHighAvailabilityContainer {
+
+  private Configurations serviceConfig;
+  private HighAvailabilityContainer haContainer;
+  private MockZkState mockZkState;
+  private CuratorFramework mockZkClient;
+  private LeaderLatch mockLeaderLatch;
+
+  @Before
+  public void setUp() throws Exception {
+    mockZkState = new MockZkState();
+    mockZkClient = createMockZkClient();
+    mockLeaderLatch = createMockLeaderLatch();
+
+    // Create test configuration
+    serviceConfig = new Configurations();
+    serviceConfig.setString(AmoroManagementConf.SERVER_EXPOSE_HOST, 
"127.0.0.1");
+    
serviceConfig.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, 
1260);
+    
serviceConfig.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
 1261);
+    serviceConfig.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, 1630);
+    serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+    serviceConfig.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS, 
"127.0.0.1:2181");
+    serviceConfig.setString(AmoroManagementConf.HA_CLUSTER_NAME, 
"test-cluster");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (haContainer != null) {
+      haContainer.close();
+    }
+    mockZkState.clear();
+  }
+
+  @Test
+  public void testRegisterAndElectWithoutMasterSlaveMode() throws Exception {
+    // Test that node registration is skipped when master-slave mode is 
disabled
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+    haContainer = createContainerWithMockZk();
+
+    // Should not throw exception and should not register node
+    haContainer.registerAndElect();
+
+    // Verify no node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> children = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals(
+        "No nodes should be registered when master-slave mode is disabled", 0, 
children.size());
+  }
+
+  @Test
+  public void testRegisterAndElectWithMasterSlaveMode() throws Exception {
+    // Test that node registration works when master-slave mode is enabled
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Verify node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> children = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("One node should be registered", 1, children.size());
+
+    // Verify node data
+    String nodePath = nodesPath + "/" + children.get(0);
+    byte[] data = mockZkState.getData(nodePath);
+    Assert.assertNotNull("Node data should not be null", data);
+    Assert.assertTrue("Node data should not be empty", data.length > 0);
+
+    // Verify node info
+    String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
+    AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, 
AmsServerInfo.class);
+    Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+    Assert.assertEquals(
+        "Thrift port should match", Integer.valueOf(1261), 
nodeInfo.getThriftBindPort());
+  }
+
+  @Test
+  public void testGetAliveNodesWithoutMasterSlaveMode() throws Exception {
+    // Test that getAliveNodes returns empty list when master-slave mode is 
disabled
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+    haContainer = createContainerWithMockZk();
+
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals(
+        "Alive nodes list should be empty when master-slave mode is disabled",
+        0,
+        aliveNodes.size());
+  }
+
+  @Test
+  public void testGetAliveNodesWhenNotLeader() throws Exception {
+    // Test that getAliveNodes returns empty list when not leader
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(false); // Not leader
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Since we're not the leader, should return empty list
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals("Alive nodes list should be empty when not leader", 0, 
aliveNodes.size());
+  }
+
+  @Test
+  public void testGetAliveNodesAsLeader() throws Exception {
+    // Test that getAliveNodes returns nodes when leader
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Verify we are leader
+    Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+
+    // Get alive nodes
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals("Should have one alive node", 1, aliveNodes.size());
+
+    // Verify node info
+    AmsServerInfo nodeInfo = aliveNodes.get(0);
+    Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+    Assert.assertEquals(
+        "Thrift port should match", Integer.valueOf(1261), 
nodeInfo.getThriftBindPort());
+    Assert.assertEquals(
+        "HTTP port should match", Integer.valueOf(1630), 
nodeInfo.getRestBindPort());
+  }
+
+  @Test
+  public void testGetAliveNodesWithMultipleNodes() throws Exception {
+    // Test that getAliveNodes returns all registered nodes
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+    haContainer = createContainerWithMockZk();
+
+    // Register first node
+    haContainer.registerAndElect();
+
+    // Verify first node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> childrenAfterFirst = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("First node should be registered", 1, 
childrenAfterFirst.size());
+
+    // Register second node manually in mock state
+    // Use createNode with sequential path to get the correct sequence number
+    AmsServerInfo nodeInfo2 = new AmsServerInfo();
+    nodeInfo2.setHost("127.0.0.2");
+    nodeInfo2.setThriftBindPort(1262);
+    nodeInfo2.setRestBindPort(1631);
+    String nodeInfo2Json = JacksonUtil.toJSONString(nodeInfo2);
+    // Use sequential path ending with "-" to let createNode generate the 
sequence number
+    // This ensures the second node gets the correct sequence number 
(0000000001)
+    mockZkState.createNode(nodesPath + "/node-", 
nodeInfo2Json.getBytes(StandardCharsets.UTF_8));
+
+    // Get alive nodes
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals("Should have two alive nodes", 2, aliveNodes.size());
+  }
+
+  @Test
+  public void testCloseUnregistersNode() throws Exception {
+    // Test that close() unregisters the node
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Verify node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> children = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("One node should be registered", 1, children.size());
+
+    // Close container
+    haContainer.close();
+    haContainer = null;
+
+    // Verify node was unregistered
+    List<String> childrenAfterClose = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("No nodes should be registered after close", 0, 
childrenAfterClose.size());
+  }
+
+  @Test
+  public void testHasLeadership() throws Exception {
+    // Test hasLeadership() method
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(false); // Not leader initially
+    haContainer = createContainerWithMockZk();
+
+    // Initially should not be leader
+    Assert.assertFalse("Should not be leader initially", 
haContainer.hasLeadership());
+
+    // Change to leader
+    mockLeaderLatch = createMockLeaderLatch(true);
+    haContainer = createContainerWithMockZk();
+
+    // Should be leader now
+    Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+  }
+
+  @Test
+  public void testRegisterAndElectWithoutHAEnabled() throws Exception {
+    // Test that registAndElect skips when HA is not enabled
+    serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    haContainer = new ZkHighAvailabilityContainer(serviceConfig);
+
+    // Should not throw exception
+    haContainer.registerAndElect();
+  }
+
+  /** Create HighAvailabilityContainer with mocked ZK components using 
reflection. */
+  private HighAvailabilityContainer createContainerWithMockZk() throws 
Exception {
+    // Create container without ZK connection to avoid any connection attempts
+    HighAvailabilityContainer container = createContainerWithoutZk();
+
+    // Inject mock ZK client and leader latch (fields are on 
ZkHighAvailabilityContainer)
+    java.lang.reflect.Field zkClientField =
+        ZkHighAvailabilityContainer.class.getDeclaredField("zkClient");
+    zkClientField.setAccessible(true);
+    zkClientField.set(container, mockZkClient);
+
+    java.lang.reflect.Field leaderLatchField =
+        ZkHighAvailabilityContainer.class.getDeclaredField("leaderLatch");
+    leaderLatchField.setAccessible(true);
+    leaderLatchField.set(container, mockLeaderLatch);
+
+    // Note: We don't need to create the paths themselves as nodes in ZK
+    // ZK paths are logical containers, not actual nodes
+    // The createPathIfNeeded() calls will be handled by the mock when needed
+
+    return container;
+  }
+
+  /**
+   * Create a HighAvailabilityContainer without initializing ZK connection. 
This is used when we
+   * want to completely avoid ZK connection attempts.
+   *
+   * <p>Uses ZkHighAvailabilityContainer (which has the constructor and 
fields);
+   * HighAvailabilityContainer is an interface without constructors or 
instance fields.
+   */
+  private HighAvailabilityContainer createContainerWithoutZk() throws 
Exception {
+    // ZkHighAvailabilityContainer has constructor (Configurations); 
HighAvailabilityContainer is an
+    // interface
+    java.lang.reflect.Constructor<ZkHighAvailabilityContainer> constructor =
+        
ZkHighAvailabilityContainer.class.getDeclaredConstructor(Configurations.class);
+
+    // Create a minimal config that disables HA to avoid ZK connection
+    Configurations tempConfig = new Configurations(serviceConfig);
+    tempConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+
+    HighAvailabilityContainer container = constructor.newInstance(tempConfig);
+
+    // Now set all required fields using reflection (fields are on 
ZkHighAvailabilityContainer)
+    java.lang.reflect.Field isMasterSlaveModeField =
+        
ZkHighAvailabilityContainer.class.getDeclaredField("isMasterSlaveMode");
+    isMasterSlaveModeField.setAccessible(true);
+    isMasterSlaveModeField.set(
+        container, 
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE));
+
+    if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
+      String haClusterName = 
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+      java.lang.reflect.Field tableServiceMasterPathField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceMasterPath");
+      tableServiceMasterPathField.setAccessible(true);
+      tableServiceMasterPathField.set(
+          container, AmsHAProperties.getTableServiceMasterPath(haClusterName));
+
+      java.lang.reflect.Field optimizingServiceMasterPathField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceMasterPath");
+      optimizingServiceMasterPathField.setAccessible(true);
+      optimizingServiceMasterPathField.set(
+          container, 
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName));
+
+      java.lang.reflect.Field nodesPathField =
+          ZkHighAvailabilityContainer.class.getDeclaredField("nodesPath");
+      nodesPathField.setAccessible(true);
+      nodesPathField.set(container, 
AmsHAProperties.getNodesPath(haClusterName));
+
+      java.lang.reflect.Field tableServiceServerInfoField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceServerInfo");
+      tableServiceServerInfoField.setAccessible(true);
+      AmsServerInfo tableServiceServerInfo =
+          buildServerInfo(
+              serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+              
serviceConfig.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT),
+              serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+      tableServiceServerInfoField.set(container, tableServiceServerInfo);
+
+      java.lang.reflect.Field optimizingServiceServerInfoField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceServerInfo");
+      optimizingServiceServerInfoField.setAccessible(true);
+      AmsServerInfo optimizingServiceServerInfo =
+          buildServerInfo(
+              serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+              
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT),
+              serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+      optimizingServiceServerInfoField.set(container, 
optimizingServiceServerInfo);
+    }
+
+    return container;
+  }
+
+  /** Helper method to build AmsServerInfo (copied from 
HighAvailabilityContainer). */
+  private AmsServerInfo buildServerInfo(String host, Integer thriftPort, 
Integer httpPort) {
+    AmsServerInfo serverInfo = new AmsServerInfo();
+    serverInfo.setHost(host);
+    serverInfo.setThriftBindPort(thriftPort);
+    serverInfo.setRestBindPort(httpPort);
+    return serverInfo;
+  }
+
+  /** Create a mock CuratorFramework that uses MockZkState for storage. */
+  @SuppressWarnings("unchecked")
+  private CuratorFramework createMockZkClient() throws Exception {
+    CuratorFramework mockClient = mock(CuratorFramework.class);
+
+    // Mock getChildren() - create a chain of mocks
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+        getChildrenBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .GetChildrenBuilder.class);
+    when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+    when(getChildrenBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getChildren(path);
+            });
+
+    // Mock getData()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+        getDataBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+                    .class);
+    when(mockClient.getData()).thenReturn(getDataBuilder);
+    when(getDataBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getData(path);
+            });
+
+    // Mock create() - manually create the entire fluent API chain to ensure 
consistency
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
 createBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+    @SuppressWarnings("unchecked")
+    org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                .ProtectACLCreateModeStatPathAndBytesable<
+            String>
+        pathAndBytesable =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .ProtectACLCreateModeStatPathAndBytesable.class);
+
+    when(mockClient.create()).thenReturn(createBuilder);
+
+    // Mock the chain: creatingParentsIfNeeded() -> withMode() -> forPath()
+    // Use the same mock object for the entire chain
+    when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+    
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+    // Mock forPath(path, data) - used by registAndElect()
+    when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              byte[] data = invocation.getArgument(1);
+              return mockZkState.createNode(path, data);
+            });
+
+    // Mock forPath(path) - used by createPathIfNeeded()
+    // Note: createPathIfNeeded() creates paths without data, but we still 
need to store them
+    // so that getChildren() can work correctly
+    when(pathAndBytesable.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              // Create the path as an empty node (this simulates ZK path 
creation)
+              // In real ZK, paths are logical containers, but we need to 
store them
+              // to make getChildren() work correctly
+              if (mockZkState.exists(path) == null) {
+                mockZkState.createNode(path, new byte[0]);
+              }
+              return null;
+            });
+
+    // Mock delete()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
 deleteBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+    when(mockClient.delete()).thenReturn(deleteBuilder);
+    doAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              mockZkState.deleteNode(path);
+              return null;
+            })
+        .when(deleteBuilder)
+        .forPath(anyString());
+
+    // Mock checkExists()
+    @SuppressWarnings("unchecked")
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+        checkExistsBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+                    .class);
+    when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+    when(checkExistsBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.exists(path);
+            });
+
+    // Mock start() and close()
+    doAnswer(invocation -> null).when(mockClient).start();
+    doAnswer(invocation -> null).when(mockClient).close();
+
+    return mockClient;
+  }
+
+  /** Create a mock LeaderLatch. */
+  private LeaderLatch createMockLeaderLatch() throws Exception {
+    return createMockLeaderLatch(true);
+  }
+
+  /** Create a mock LeaderLatch with specified leadership status. */
+  private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws 
Exception {
+    LeaderLatch mockLatch = mock(LeaderLatch.class);
+    when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+    doAnswer(invocation -> null).when(mockLatch).addListener(any());
+    doAnswer(invocation -> null).when(mockLatch).start();
+    doAnswer(invocation -> null).when(mockLatch).close();
+    // Mock await() - it throws IOException and InterruptedException
+    doAnswer(
+            invocation -> {
+              // Mock implementation - doesn't actually wait
+              return null;
+            })
+        .when(mockLatch)
+        .await();
+    return mockLatch;
+  }
+
+  /** In-memory ZK state simulator. */
+  private static class MockZkState {
+    private final Map<String, byte[]> nodes = new HashMap<>();
+    private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+    public List<String> getChildren(String path) throws KeeperException {
+      List<String> children = new ArrayList<>();
+      String prefix = path.endsWith("/") ? path : path + "/";
+      for (String nodePath : nodes.keySet()) {
+        // Only include direct children (not the path itself, and not nested 
paths)
+        if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+          String relativePath = nodePath.substring(prefix.length());
+          // Only add direct children (no additional slashes)
+          // This means the path should be exactly: prefix + relativePath
+          if (!relativePath.contains("/")) {
+            children.add(relativePath);
+          }
+        }
+      }
+      // Sort to ensure consistent ordering
+      children.sort(String::compareTo);
+      return children;
+    }
+
+    public byte[] getData(String path) throws KeeperException {
+      byte[] data = nodes.get(path);
+      if (data == null) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      return data;
+    }
+
+    public String createNode(String path, byte[] data) {
+      // Handle sequential nodes
+      if (path.endsWith("-")) {
+        int seq = sequenceCounter.incrementAndGet();
+        path = path + String.format("%010d", seq);
+      }
+      nodes.put(path, data);
+      return path;
+    }
+
+    public void deleteNode(String path) throws KeeperException {
+      if (!nodes.containsKey(path)) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      nodes.remove(path);
+    }
+
+    public Stat exists(String path) {
+      return nodes.containsKey(path) ? new Stat() : null;
+    }
+
+    public void clear() {
+      nodes.clear();
+      sequenceCounter.set(0);
+    }
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java 
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
index e794b520d..08b9ef04a 100644
--- 
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
+++ 
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
@@ -25,6 +25,7 @@ public class AmsHAProperties {
   private static final String LEADER_PATH = "/leader";
   private static final String TABLE_SERVICE_MASTER_PATH = "/master";
   private static final String OPTIMIZING_SERVICE_MASTER_PATH = 
"/optimizing-service-master";
+  private static final String NODES_PATH = "/nodes";
   private static final String NAMESPACE_DEFAULT = "default";
 
   private static String getBasePath(String namespace) {
@@ -45,4 +46,8 @@ public class AmsHAProperties {
   public static String getLeaderPath(String namespace) {
     return getBasePath(namespace) + LEADER_PATH;
   }
+
+  public static String getNodesPath(String namespace) {
+    return getBasePath(namespace) + NODES_PATH;
+  }
 }


Reply via email to