This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new e0246aae0 [AMORO-3998]support DATABASE HA SERVICE (#3997)
e0246aae0 is described below

commit e0246aae056d823188f818bc6396fafcc6f2314b
Author: LiangDai-Mars <[email protected]>
AuthorDate: Mon Dec 22 16:49:26 2025 +0800

    [AMORO-3998]support DATABASE HA SERVICE (#3997)
    
    support DATABASE HA SERVICE
    
    Co-authored-by: dailiang <[email protected]>
---
 .../apache/amoro/server/AmoroManagementConf.java   |  22 ++
 .../amoro/server/AmoroManagementConfValidator.java |  21 +-
 .../apache/amoro/server/AmoroServiceContainer.java |   4 +-
 .../ha/DataBaseHighAvailabilityContainer.java      | 311 +++++++++++++++++
 .../amoro/server/ha/HighAvailabilityContainer.java |  45 +++
 .../ha/HighAvailabilityContainerFactory.java       |  65 ++++
 .../server/ha/NoopHighAvailabilityContainer.java   |  45 +++
 .../ZkHighAvailabilityContainer.java}              |  12 +-
 .../amoro/server/persistence/HaLeaseMeta.java      | 158 +++++++++
 .../persistence/SqlSessionFactoryProvider.java     |   2 +
 .../server/persistence/mapper/HaLeaseMapper.java   | 197 +++++++++++
 .../src/main/resources/derby/ams-derby-init.sql    |  17 +-
 .../src/main/resources/mysql/ams-mysql-init.sql    |  14 +
 .../main/resources/postgres/ams-postgres-init.sql  |  25 +-
 .../server/HighAvailabilityContainerTest.java      |   5 +-
 .../ha/DataBaseHighAvailabilityContainerIT.java    | 370 +++++++++++++++++++++
 .../amoro/server/ha/DatabaseHaEdgeCasesIT.java     | 208 ++++++++++++
 .../org/apache/amoro/server/ha/HATestConfigs.java  |  98 ++++++
 docs/configuration/ams-config.md                   |   3 +
 19 files changed, 1609 insertions(+), 13 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 8ba5439bd..579142b2e 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -204,12 +204,24 @@ public class AmoroManagementConf {
           .defaultValue(false)
           .withDescription("Whether to enable high availability mode.");
 
+  public static final ConfigOption<String> HA_TYPE =
+      ConfigOptions.key("ha.type")
+          .stringType()
+          .defaultValue(AmoroManagementConf.HA_TYPE_ZK)
+          .withDescription("High availability implementation type: zk or 
database.");
+
   public static final ConfigOption<String> HA_CLUSTER_NAME =
       ConfigOptions.key("ha.cluster-name")
           .stringType()
           .defaultValue("default")
           .withDescription("Amoro management service cluster name.");
 
+  public static final ConfigOption<java.time.Duration> HA_HEARTBEAT_INTERVAL =
+      ConfigOptions.key("ha.heartbeat-interval")
+          .durationType()
+          .defaultValue(java.time.Duration.ofSeconds(10))
+          .withDescription("HA heartbeat interval.");
+
   public static final ConfigOption<String> HA_ZOOKEEPER_ADDRESS =
       ConfigOptions.key("ha.zookeeper-address")
           .stringType()
@@ -247,6 +259,12 @@ public class AmoroManagementConf {
           .defaultValue(Duration.ofSeconds(300))
           .withDescription("The Zookeeper connection timeout in 
milliseconds.");
 
+  public static final ConfigOption<java.time.Duration> HA_LEASE_TTL =
+      ConfigOptions.key("ha.lease-ttl")
+          .durationType()
+          .defaultValue(java.time.Duration.ofSeconds(30))
+          .withDescription("TTL of HA lease.");
+
   public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
       ConfigOptions.key("thrift-server.table-service.bind-port")
           .intType()
@@ -536,6 +554,10 @@ public class AmoroManagementConf {
   public static final String DB_TYPE_MYSQL = "mysql";
   public static final String DB_TYPE_POSTGRES = "postgres";
 
+  // HA config
+  public static final String HA_TYPE_ZK = "zk";
+  public static final String HA_TYPE_DATABASE = "database";
+
   // terminal config
   public static final List<String> TERMINAL_BACKEND_VALUES =
       Arrays.asList("local", "kyuubi", "custom");
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
index bae28f761..1e78520f0 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
@@ -49,11 +49,24 @@ public class AmoroManagementConfValidator {
 
     // HA config
     if (configurations.getBoolean(AmoroManagementConf.HA_ENABLE)) {
-      if 
("".equals(configurations.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS))) 
{
+      String type = 
configurations.getString(AmoroManagementConf.HA_TYPE).toLowerCase();
+      if (!AmoroManagementConf.HA_TYPE_ZK.equals(type)
+          && !AmoroManagementConf.HA_TYPE_DATABASE.equals(type)) {
         throw new IllegalArgumentException(
-            AmoroManagementConf.HA_ZOOKEEPER_ADDRESS.key()
-                + " must be configured when you enable "
-                + "the ams high availability");
+            String.format("Illegal ha.type: %s, zk or database is available", 
type));
+      }
+      if (AmoroManagementConf.HA_TYPE_ZK.equals(type)) {
+        if 
("".equals(configurations.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS))) 
{
+          throw new IllegalArgumentException(
+              AmoroManagementConf.HA_ZOOKEEPER_ADDRESS.key()
+                  + " must be configured when ha.type=zk");
+        }
+      } else {
+        String dbUrl = 
configurations.getString(AmoroManagementConf.DB_CONNECTION_URL);
+        if ("".equals(dbUrl)) {
+          throw new IllegalArgumentException(
+              "database.url must be configured when ha.type=database");
+        }
       }
     }
     // terminal config
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 2fd9f7560..18cdf8666 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -36,6 +36,8 @@ import org.apache.amoro.server.dashboard.JavalinJsonMapper;
 import org.apache.amoro.server.dashboard.response.ErrorResponse;
 import org.apache.amoro.server.dashboard.utils.AmsUtil;
 import org.apache.amoro.server.dashboard.utils.CommonUtil;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.HighAvailabilityContainerFactory;
 import org.apache.amoro.server.manager.EventsManager;
 import org.apache.amoro.server.manager.MetricManager;
 import org.apache.amoro.server.persistence.DataSourceFactory;
@@ -115,7 +117,7 @@ public class AmoroServiceContainer {
 
   public AmoroServiceContainer() throws Exception {
     initConfig();
-    haContainer = new HighAvailabilityContainer(serviceConfig);
+    haContainer = HighAvailabilityContainerFactory.create(serviceConfig);
   }
 
   public static void main(String[] args) {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
new file mode 100644
index 000000000..70d1253c6
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.HaLeaseMeta;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * High availability (HA) container backed by a Database lease (RDBMS lock).
+ *
+ * <p>Core mechanics: - fixed-rate heartbeat to acquire/renew the lease - 
optimistic row version
+ * plus expiration timestamp - leader election: first successful acquire 
becomes the leader; losing
+ * the lease demotes the node - persistence via PersistentBase and 
HaLeaseMetaMapper - concurrency
+ * guard using AtomicBoolean flags; once the lease is revoked, leadership is 
not re-gained
+ *
+ * <p>Reference to Hudi AnalogousZkClient (for alignment only, no 
System.exit): - await/signal:
+ * waitLeaderShip() condition wait and signal on leadership gain - heartbeat: 
scheduled
+ * acquire/renew - losing leadership: demote and countDown follower latch, no 
System.exit
+ */
+public class DataBaseHighAvailabilityContainer extends PersistentBase
+    implements HighAvailabilityContainer {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBaseHighAvailabilityContainer.class);
+  private static final String AMS_SERVICE = "AMS";
+  private static final String TABLE_SERVICE = "TABLE_SERVICE";
+  private static final String OPTIMIZING_SERVICE = "OPTIMIZING_SERVICE";
+
+  private final Configurations serviceConfig;
+  private final ScheduledExecutorService executor;
+  private final AtomicBoolean isLeader = new AtomicBoolean(false);
+  /** Prevent re-gaining leadership once the lease is lost. */
+  private final AtomicBoolean leadershipRevoked = new AtomicBoolean(false);
+
+  private volatile CountDownLatch followerLatch;
+
+  private final Lock leaderLock = new ReentrantLock();
+  private final Condition leaderCondition = leaderLock.newCondition();
+
+  private final String clusterName;
+  private final long heartbeatIntervalSeconds;
+  private final long ttlSeconds;
+
+  private final String nodeId;
+  private final String nodeIp;
+  private final AmsServerInfo tableServiceServerInfo;
+  private final AmsServerInfo optimizingServiceServerInfo;
+
+  /** Local lease version (optimistic lock). Null means not acquired yet. */
+  private volatile Integer leaseVersion = null;
+
+  public DataBaseHighAvailabilityContainer(Configurations serviceConfig) {
+    this.serviceConfig = serviceConfig;
+    this.clusterName = 
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+    // Read heartbeat interval and lease TTL from common settings (Duration), 
expressed in seconds
+    this.heartbeatIntervalSeconds =
+        
serviceConfig.get(AmoroManagementConf.HA_HEARTBEAT_INTERVAL).getSeconds();
+    this.ttlSeconds = 
serviceConfig.get(AmoroManagementConf.HA_LEASE_TTL).getSeconds();
+
+    this.nodeIp = 
serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST);
+    int tableThriftPort =
+        
serviceConfig.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT);
+    int optimizingThriftPort =
+        
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT);
+    int restPort = 
serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT);
+
+    // Build a unique node ID (IP + optimizing port + random suffix) to 
distinguish multi-instance
+    // on the same host
+    this.nodeId = nodeIp + ":" + optimizingThriftPort + ":" + 
UUID.randomUUID();
+
+    this.tableServiceServerInfo = buildServerInfo(nodeIp, tableThriftPort, 
restPort);
+    this.optimizingServiceServerInfo = buildServerInfo(nodeIp, 
optimizingThriftPort, restPort);
+
+    this.executor = Executors.newSingleThreadScheduledExecutor();
+    // Heartbeat similar to AnalogousZkClient: fixed-rate renewal with initial 
acquire attempt
+    this.executor.scheduleAtFixedRate(
+        new HeartbeatRunnable(), 5, Math.max(1, heartbeatIntervalSeconds), 
TimeUnit.SECONDS);
+  }
+
+  /** Blocks until leadership is acquired and server info is written for both 
services. */
+  @Override
+  public void waitLeaderShip() throws InterruptedException {
+    LOG.info("Waiting to become the leader of AMS (Database lease)");
+    leaderLock.lock();
+    try {
+      while (!isLeader.get()) {
+        leaderCondition.await();
+      }
+    } finally {
+      leaderLock.unlock();
+    }
+    LOG.info("Became the leader of AMS (Database lease)");
+  }
+
+  /** Blocks until the follower latch is counted down (leadership lost or 
demoted). */
+  @Override
+  public void waitFollowerShip() throws InterruptedException {
+    LOG.info("Waiting to become the follower of AMS (Database lease)");
+    if (followerLatch != null) {
+      followerLatch.await();
+    }
+    LOG.info("Became the follower of AMS (Database lease)");
+  }
+
+  /** Closes the heartbeat executor safely. */
+  @Override
+  public void close() {
+    try {
+      if (executor != null && !executor.isShutdown()) {
+        executor.shutdown();
+      }
+    } catch (Exception e) {
+      LOG.error("Close Database HighAvailabilityContainer failed", e);
+    }
+  }
+
+  private class HeartbeatRunnable implements Runnable {
+    @Override
+    public void run() {
+      try {
+        long now = System.currentTimeMillis();
+        long newExpireTs = now + TimeUnit.SECONDS.toMillis(ttlSeconds);
+
+        if (leadershipRevoked.get()) {
+          // Lease already revoked; skip any subsequent leadership path
+          return;
+        }
+
+        if (!isLeader.get()) {
+          // First attempt to acquire the lease (similar to candidate/await)
+          boolean success = tryAcquireLease(newExpireTs, now);
+          LOG.info(
+              "Try to acquire AMS lease: success={}, cluster={}, nodeId={}, 
nodeIp={}",
+              success,
+              clusterName,
+              nodeId,
+              nodeIp);
+          if (success) {
+            // On leadership gained: signal waiters and write server info for 
both services
+            onLeaderGained(now);
+          }
+        } else {
+          // Leader renew (similar to renew lease)
+          boolean renewed = tryRenewLease(newExpireTs, now);
+          LOG.info(
+              "Try to renew AMS lease: success={}, cluster={}, nodeId={}",
+              renewed,
+              clusterName,
+              nodeId);
+          if (!renewed && isLeader.get()) {
+            // Leadership lost: demote; no System.exit
+            onLeaderLost();
+          }
+        }
+      } catch (Throwable t) {
+        LOG.error("Database HA heartbeat failed", t);
+      }
+    }
+  }
+
+  private boolean tryAcquireLease(long newExpireTs, long now) {
+    // Insert lease row if absent (idempotent) to ensure existence
+    doAsIgnoreError(
+        HaLeaseMapper.class,
+        mapper ->
+            mapper.insertIfAbsent(
+                new HaLeaseMeta(
+                    clusterName, AMS_SERVICE, nodeId, nodeIp, "", newExpireTs, 
0, now)));
+
+    // Acquire when the lease is expired or free
+    int affected =
+        updateAs(
+                HaLeaseMapper.class,
+                mapper ->
+                    mapper.acquireLease(
+                        clusterName,
+                        AMS_SERVICE,
+                        nodeId,
+                        nodeIp,
+                        JacksonUtil.toJSONString(tableServiceServerInfo),
+                        newExpireTs,
+                        now))
+            .intValue();
+    if (affected == 1) {
+      // Read current version
+      HaLeaseMeta lease =
+          getAs(HaLeaseMapper.class, mapper -> mapper.selectLease(clusterName, 
AMS_SERVICE));
+      leaseVersion = lease != null ? lease.getVersion() : 0;
+      return true;
+    }
+    return false;
+  }
+
+  private boolean tryRenewLease(long newExpireTs, long now) {
+    if (leaseVersion == null) {
+      HaLeaseMeta lease =
+          getAs(HaLeaseMapper.class, mapper -> mapper.selectLease(clusterName, 
AMS_SERVICE));
+      leaseVersion = lease != null ? lease.getVersion() : 0;
+    }
+    int affected =
+        updateAs(
+                HaLeaseMapper.class,
+                mapper ->
+                    mapper.renewLease(
+                        clusterName, AMS_SERVICE, nodeId, leaseVersion, 
newExpireTs, now))
+            .intValue();
+    if (affected == 1) {
+      leaseVersion = leaseVersion + 1;
+      return true;
+    }
+    return false;
+  }
+
+  private void onLeaderGained(long now) {
+    isLeader.set(true);
+    followerLatch = new CountDownLatch(1);
+
+    // Equivalent to AnalogousZkClient.signalAll()
+    leaderLock.lock();
+    try {
+      leaderCondition.signalAll();
+    } finally {
+      leaderLock.unlock();
+    }
+
+    // Persist server info for TABLE_SERVICE and OPTIMIZING_SERVICE on 
leadership
+    String tableInfoJson = JacksonUtil.toJSONString(tableServiceServerInfo);
+    String optimizingInfoJson = 
JacksonUtil.toJSONString(optimizingServiceServerInfo);
+
+    doAsIgnoreError(
+        HaLeaseMapper.class,
+        mapper -> {
+          int u1 =
+              mapper.updateServerInfo(
+                  clusterName, TABLE_SERVICE, nodeId, nodeIp, tableInfoJson, 
now);
+          if (u1 == 0) {
+            mapper.insertServerInfoIfAbsent(
+                clusterName, TABLE_SERVICE, nodeId, nodeIp, tableInfoJson, 
now);
+          }
+        });
+    doAsIgnoreError(
+        HaLeaseMapper.class,
+        mapper -> {
+          int u2 =
+              mapper.updateServerInfo(
+                  clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, 
optimizingInfoJson, now);
+          if (u2 == 0) {
+            mapper.insertServerInfoIfAbsent(
+                clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, 
optimizingInfoJson, now);
+          }
+        });
+
+    LOG.info(
+        "Current node become leader, tableService={}, optimizingService={}",
+        tableServiceServerInfo,
+        optimizingServiceServerInfo);
+  }
+
+  private void onLeaderLost() {
+    LOG.info(
+        "Current node as leader is disconnected or lost the lease, cluster={}, 
nodeId={}",
+        clusterName,
+        nodeId);
+    leadershipRevoked.set(true);
+    isLeader.set(false);
+    if (followerLatch != null) {
+      followerLatch.countDown();
+    }
+  }
+
+  private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int 
restBindPort) {
+    AmsServerInfo amsServerInfo = new AmsServerInfo();
+    amsServerInfo.setHost(host);
+    amsServerInfo.setRestBindPort(restBindPort);
+    amsServerInfo.setThriftBindPort(thriftBindPort);
+    return amsServerInfo;
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
new file mode 100644
index 000000000..15a005e1f
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+/**
+ * Common interface for high availability (HA) containers.
+ *
+ * <p>Provides a unified contract for AMS HA implementations (ZK/DATABASE) to 
wait for leadership,
+ * wait for follower state, and close resources.
+ */
+public interface HighAvailabilityContainer {
+
+  /**
+   * Blocks until this node gains leadership.
+   *
+   * @throws Exception if waiting fails or the underlying implementation 
throws an error
+   */
+  void waitLeaderShip() throws Exception;
+
+  /**
+   * Blocks until this node becomes a follower (for example, when leadership 
is lost).
+   *
+   * @throws Exception if waiting fails or the underlying implementation 
throws an error
+   */
+  void waitFollowerShip() throws Exception;
+
+  /** Closes the container and releases resources. */
+  void close();
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainerFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainerFactory.java
new file mode 100644
index 000000000..2580b315e
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating HA containers based on configuration.
+ *
+ * <p>Supports "zk" and "database". When ha.enabled=false, returns a no-op 
container.
+ */
+public final class HighAvailabilityContainerFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityContainerFactory.class);
+
+  private HighAvailabilityContainerFactory() {}
+
+  /**
+   * Creates an HA container based on the given configuration.
+   *
+   * @param conf service configuration
+   * @return a HA container implementation according to ha.enabled and ha.type
+   * @throws IllegalArgumentException if ha.type is unsupported
+   * @throws RuntimeException if the ZK container cannot be created
+   */
+  public static HighAvailabilityContainer create(Configurations conf) {
+    boolean enabled = conf.getBoolean(AmoroManagementConf.HA_ENABLE);
+    if (!enabled) {
+      LOG.info("HA is disabled, use NoopHighAvailabilityContainer");
+      return new NoopHighAvailabilityContainer();
+    }
+    String type = conf.getString(AmoroManagementConf.HA_TYPE).toLowerCase();
+    switch (type) {
+      case AmoroManagementConf.HA_TYPE_ZK:
+        try {
+          return new ZkHighAvailabilityContainer(conf);
+        } catch (Exception e) {
+          throw new RuntimeException("Failed to create 
ZkHighAvailabilityContainer", e);
+        }
+      case AmoroManagementConf.HA_TYPE_DATABASE:
+        return new DataBaseHighAvailabilityContainer(conf);
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported ha.type: " + type + ", only 'zk' or 'database' are 
allowed");
+    }
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
new file mode 100644
index 000000000..e8bef7d9c
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** No-op HA container that never blocks and performs no leader election. */
+public class NoopHighAvailabilityContainer implements 
HighAvailabilityContainer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NoopHighAvailabilityContainer.class);
+
+  @Override
+  /** Returns immediately without blocking. */
+  public void waitLeaderShip() {
+    LOG.info("Noop HA: waitLeaderShip returns immediately");
+  }
+
+  @Override
+  /** Returns immediately without blocking. */
+  public void waitFollowerShip() {
+    LOG.info("Noop HA: waitFollowerShip returns immediately");
+  }
+
+  @Override
+  /** No-op close operation. */
+  public void close() {
+    LOG.info("Noop HA: close");
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
similarity index 96%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
rename to 
amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index 7f74d3924..b168eff55 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -16,11 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server;
+package org.apache.amoro.server.ha;
 
 import org.apache.amoro.client.AmsServerInfo;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.AmoroManagementConf;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
@@ -48,9 +49,9 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-public class HighAvailabilityContainer implements LeaderLatchListener {
+public class ZkHighAvailabilityContainer implements HighAvailabilityContainer, 
LeaderLatchListener {
 
-  public static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityContainer.class);
+  public static final Logger LOG = 
LoggerFactory.getLogger(ZkHighAvailabilityContainer.class);
 
   private final LeaderLatch leaderLatch;
   private final CuratorFramework zkClient;
@@ -60,7 +61,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
   private final AmsServerInfo optimizingServiceServerInfo;
   private volatile CountDownLatch followerLatch;
 
-  public HighAvailabilityContainer(Configurations serviceConfig) throws 
Exception {
+  public ZkHighAvailabilityContainer(Configurations serviceConfig) throws 
Exception {
     if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
       String zkServerAddress = 
serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS);
       int zkSessionTimeout =
@@ -109,6 +110,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
     }
   }
 
+  @Override
   public void waitLeaderShip() throws Exception {
     LOG.info("Waiting to become the leader of AMS");
     if (leaderLatch != null) {
@@ -138,6 +140,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
     LOG.info("Became the leader of AMS");
   }
 
+  @Override
   public void waitFollowerShip() throws Exception {
     LOG.info("Waiting to become the follower of AMS");
     if (followerLatch != null) {
@@ -146,6 +149,7 @@ public class HighAvailabilityContainer implements 
LeaderLatchListener {
     LOG.info("Became the follower of AMS");
   }
 
+  @Override
   public void close() {
     if (leaderLatch != null) {
       try {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HaLeaseMeta.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HaLeaseMeta.java
new file mode 100644
index 000000000..510fd0c31
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HaLeaseMeta.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence;
+
+import java.util.Objects;
+
+/**
+ * Entity representing an HA lease stored in the database.
+ *
+ * <p>Fields: - cluster_name: AMS cluster name - service_name: service name 
(AMS, TABLE_SERVICE,
+ * OPTIMIZING_SERVICE) - node_id: unique node identifier - node_ip: node IP 
address -
+ * server_info_json: serialized AmsServerInfo - lease_expire_ts: lease 
expiration timestamp
+ * (milliseconds) - version: row version used for optimistic locking - 
updated_at: last update
+ * timestamp (milliseconds)
+ */
+public class HaLeaseMeta {
+  private String clusterName;
+  private String serviceName;
+  private String nodeId;
+  private String nodeIp;
+  private String serverInfoJson;
+  private Long leaseExpireTs;
+  private Integer version;
+  private Long updatedAt;
+
+  public HaLeaseMeta() {}
+
+  public HaLeaseMeta(
+      String clusterName,
+      String serviceName,
+      String nodeId,
+      String nodeIp,
+      String serverInfoJson,
+      Long leaseExpireTs,
+      Integer version,
+      Long updatedAt) {
+    this.clusterName = clusterName;
+    this.serviceName = serviceName;
+    this.nodeId = nodeId;
+    this.nodeIp = nodeIp;
+    this.serverInfoJson = serverInfoJson;
+    this.leaseExpireTs = leaseExpireTs;
+    this.version = version;
+    this.updatedAt = updatedAt;
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getNodeId() {
+    return nodeId;
+  }
+
+  public void setNodeId(String nodeId) {
+    this.nodeId = nodeId;
+  }
+
+  public String getNodeIp() {
+    return nodeIp;
+  }
+
+  public void setNodeIp(String nodeIp) {
+    this.nodeIp = nodeIp;
+  }
+
+  public String getServerInfoJson() {
+    return serverInfoJson;
+  }
+
+  public void setServerInfoJson(String serverInfoJson) {
+    this.serverInfoJson = serverInfoJson;
+  }
+
+  public Long getLeaseExpireTs() {
+    return leaseExpireTs;
+  }
+
+  public void setLeaseExpireTs(Long leaseExpireTs) {
+    this.leaseExpireTs = leaseExpireTs;
+  }
+
+  public Integer getVersion() {
+    return version;
+  }
+
+  public void setVersion(Integer version) {
+    this.version = version;
+  }
+
+  public Long getUpdatedAt() {
+    return updatedAt;
+  }
+
+  public void setUpdatedAt(Long updatedAt) {
+    this.updatedAt = updatedAt;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+    HaLeaseMeta haLeaseMeta = (HaLeaseMeta) other;
+    return Objects.equals(clusterName, haLeaseMeta.clusterName)
+        && Objects.equals(serviceName, haLeaseMeta.serviceName)
+        && Objects.equals(nodeId, haLeaseMeta.nodeId)
+        && Objects.equals(nodeIp, haLeaseMeta.nodeIp)
+        && Objects.equals(serverInfoJson, haLeaseMeta.serverInfoJson)
+        && Objects.equals(leaseExpireTs, haLeaseMeta.leaseExpireTs)
+        && Objects.equals(version, haLeaseMeta.version)
+        && Objects.equals(updatedAt, haLeaseMeta.updatedAt);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        clusterName,
+        serviceName,
+        nodeId,
+        nodeIp,
+        serverInfoJson,
+        leaseExpireTs,
+        version,
+        updatedAt);
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 49eb9f5db..3086bac5e 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -26,6 +26,7 @@ import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
 import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
 import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
 import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
 import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
 import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.PlatformFileMapper;
@@ -74,6 +75,7 @@ public class SqlSessionFactoryProvider {
     configuration.addMapper(TableBlockerMapper.class);
     configuration.addMapper(TableProcessMapper.class);
     configuration.addMapper(TableRuntimeMapper.class);
+    configuration.addMapper(HaLeaseMapper.class);
 
     PageInterceptor interceptor = new PageInterceptor();
     Properties interceptorProperties = new Properties();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
new file mode 100644
index 000000000..c3ce95d74
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.mapper;
+
+import org.apache.amoro.server.persistence.HaLeaseMeta;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.List;
+
+/**
+ * MyBatis mapper for HA lease table. Provides concrete SQL in annotations.
+ *
+ * <p>Notes: - insertIfAbsent: plain INSERT, duplicate key errors are ignored 
by caller via
+ * doAsIgnoreError. - acquireLease: try to acquire leadership when expired or 
empty. - renewLease:
+ * optimistic lock renewal by version. - selectLease: map a single row to 
{@link HaLeaseMeta}. -
+ * releaseLease: mark lease as expired. - upsertServerInfo: MySQL-style INSERT 
... ON DUPLICATE KEY
+ * UPDATE for server info.
+ */
+public interface HaLeaseMapper {
+
+  /**
+   * Insert a lease row. If the primary key already exists, the caller may 
ignore the exception.
+   *
+   * @param lease lease entity to insert
+   * @return affected rows
+   */
+  @Insert(
+      "INSERT INTO ha_lease "
+          + "(cluster_name, service_name, node_id, node_ip, server_info_json, 
lease_expire_ts, version, updated_at) "
+          + "VALUES (#{lease.clusterName}, #{lease.serviceName}, 
#{lease.nodeId}, #{lease.nodeIp}, "
+          + "#{lease.serverInfoJson}, #{lease.leaseExpireTs}, 
#{lease.version}, #{lease.updatedAt})")
+  int insertIfAbsent(@Param("lease") HaLeaseMeta lease);
+
+  /**
+   * Try to acquire leadership by updating the lease only when it is expired 
or empty. Increments
+   * version and sets new lease_expire_ts.
+   *
+   * @return affected rows (1 means acquired)
+   */
+  @Update(
+      "UPDATE ha_lease "
+          + "SET node_id = #{nodeId}, node_ip = #{nodeIp}, server_info_json = 
#{serverInfoJson}, "
+          + "lease_expire_ts = #{leaseExpireTs}, version = version + 1, 
updated_at = #{updatedAt} "
+          + "WHERE cluster_name = #{clusterName} AND service_name = 
#{serviceName} "
+          + "AND (lease_expire_ts IS NULL OR lease_expire_ts < #{updatedAt})")
+  int acquireLease(
+      @Param("clusterName") String clusterName,
+      @Param("serviceName") String serviceName,
+      @Param("nodeId") String nodeId,
+      @Param("nodeIp") String nodeIp,
+      @Param("serverInfoJson") String serverInfoJson,
+      @Param("leaseExpireTs") Long leaseExpireTs,
+      @Param("updatedAt") Long updatedAt);
+
+  /**
+   * Renew the lease using optimistic concurrency control. Only succeeds if 
the expected version
+   * matches and the lease is not expired.
+   *
+   * @return affected rows (1 means renewed)
+   */
+  @Update(
+      "UPDATE ha_lease "
+          + "SET lease_expire_ts = #{newLeaseExpireTs}, version = version + 1, 
updated_at = #{updatedAt} "
+          + "WHERE cluster_name = #{clusterName} AND service_name = 
#{serviceName} "
+          + "AND node_id = #{nodeId} AND version = #{expectedVersion} AND 
lease_expire_ts > #{updatedAt}")
+  int renewLease(
+      @Param("clusterName") String clusterName,
+      @Param("serviceName") String serviceName,
+      @Param("nodeId") String nodeId,
+      @Param("expectedVersion") Integer expectedVersion,
+      @Param("newLeaseExpireTs") Long newLeaseExpireTs,
+      @Param("updatedAt") Long updatedAt);
+
+  /**
+   * Select current lease for cluster and service.
+   *
+   * @param clusterName cluster name
+   * @param serviceName service name
+   * @return lease row or null
+   */
+  @Select(
+      "SELECT cluster_name, service_name, node_id, node_ip, server_info_json, 
lease_expire_ts, version, updated_at "
+          + "FROM ha_lease WHERE cluster_name = #{clusterName} AND 
service_name = #{serviceName}")
+  @ResultMap("HaLeaseMetaMap")
+  HaLeaseMeta selectLease(
+      @Param("clusterName") String clusterName, @Param("serviceName") String 
serviceName);
+
+  /**
+   * Select current lease for cluster and service.
+   *
+   * @param clusterName cluster name
+   * @param serviceName service name
+   * @return lease row or null
+   */
+  @Select(
+      "SELECT cluster_name, service_name, node_id, node_ip, server_info_json, 
lease_expire_ts, version, updated_at "
+          + "FROM ha_lease")
+  @Results(
+      id = "HaLeaseMetaMap",
+      value = {
+        @Result(column = "cluster_name", property = "clusterName"),
+        @Result(column = "service_name", property = "serviceName"),
+        @Result(column = "node_id", property = "nodeId"),
+        @Result(column = "node_ip", property = "nodeIp"),
+        @Result(column = "server_info_json", property = "serverInfoJson"),
+        @Result(column = "lease_expire_ts", property = "leaseExpireTs"),
+        @Result(column = "version", property = "version"),
+        @Result(column = "updated_at", property = "updatedAt")
+      })
+  List<HaLeaseMeta> selectAllLease();
+
+  /**
+   * Release the lease by marking it expired at updatedAt.
+   *
+   * @return affected rows
+   */
+  @Update(
+      "UPDATE ha_lease SET lease_expire_ts = #{updatedAt}, updated_at = 
#{updatedAt} "
+          + "WHERE cluster_name = #{clusterName} AND service_name = 
#{serviceName} AND node_id = #{nodeId}")
+  int releaseLease(
+      @Param("clusterName") String clusterName,
+      @Param("serviceName") String serviceName,
+      @Param("nodeId") String nodeId,
+      @Param("updatedAt") Long updatedAt);
+
+  /**
+   * Update server info if the row exists (DB-neutral upsert step).
+   *
+   * @return affected rows (1 if updated, 0 if not found)
+   */
+  @Update(
+      "UPDATE ha_lease SET node_id=#{nodeId}, node_ip=#{nodeIp}, 
server_info_json=#{serverInfoJson}, updated_at=#{updatedAt} "
+          + "WHERE cluster_name=#{clusterName} AND 
service_name=#{serviceName}")
+  int updateServerInfo(
+      @Param("clusterName") String clusterName,
+      @Param("serviceName") String serviceName,
+      @Param("nodeId") String nodeId,
+      @Param("nodeIp") String nodeIp,
+      @Param("serverInfoJson") String serverInfoJson,
+      @Param("updatedAt") Long updatedAt);
+
+  /**
+   * Insert server info if absent (DB-neutral upsert step). Sets version=0 for 
non-leader entries.
+   */
+  @Insert(
+      "INSERT INTO ha_lease (cluster_name, service_name, node_id, node_ip, 
server_info_json, version, updated_at) "
+          + "VALUES (#{clusterName}, #{serviceName}, #{nodeId}, #{nodeIp}, 
#{serverInfoJson}, 0, #{updatedAt})")
+  int insertServerInfoIfAbsent(
+      @Param("clusterName") String clusterName,
+      @Param("serviceName") String serviceName,
+      @Param("nodeId") String nodeId,
+      @Param("nodeIp") String nodeIp,
+      @Param("serverInfoJson") String serverInfoJson,
+      @Param("updatedAt") Long updatedAt);
+
+  /**
+   * Upsert server info for non-leader services using MySQL syntax. For 
Postgres, use: INSERT ... ON
+   * CONFLICT (cluster_name, service_name) DO UPDATE SET ...
+   *
+   * @return affected rows
+   */
+  @Insert(
+      "INSERT INTO ha_lease (cluster_name, service_name, node_id, node_ip, 
server_info_json, updated_at) "
+          + "VALUES (#{clusterName}, #{serviceName}, #{nodeId}, #{nodeIp}, 
#{serverInfoJson}, #{updatedAt}) "
+          + "ON DUPLICATE KEY UPDATE "
+          + "node_id = VALUES(node_id), node_ip = VALUES(node_ip), "
+          + "server_info_json = VALUES(server_info_json), updated_at = 
VALUES(updated_at)")
+  int upsertServerInfo(
+      @Param("clusterName") String clusterName,
+      @Param("serviceName") String serviceName,
+      @Param("nodeId") String nodeId,
+      @Param("nodeIp") String nodeIp,
+      @Param("serverInfoJson") String serverInfoJson,
+      @Param("updatedAt") Long updatedAt);
+}
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql 
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index a66faf55f..1e7a7e340 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -249,4 +249,19 @@ CREATE TABLE http_session (
     max_interval  BIGINT,
     data_store    BLOB,
     PRIMARY KEY(session_id, context_path, virtual_host)
-);
\ No newline at end of file
+);
+
+CREATE TABLE ha_lease (
+  cluster_name       VARCHAR(64)    NOT NULL,
+  service_name       VARCHAR(64)    NOT NULL,
+  node_id            VARCHAR(256),
+  node_ip            VARCHAR(64),
+  server_info_json   VARCHAR(32672),
+  lease_expire_ts    BIGINT,
+  version            INT            NOT NULL DEFAULT 0,
+  updated_at         BIGINT         NOT NULL,
+  PRIMARY KEY (cluster_name, service_name)
+);
+
+CREATE INDEX idx_ha_lease_expire ON ha_lease (lease_expire_ts);
+CREATE INDEX idx_ha_lease_node   ON ha_lease (node_id);
\ No newline at end of file
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql 
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index 50c32326e..f1f91e3a3 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -267,3 +267,17 @@ CREATE TABLE `http_session` (
     PRIMARY KEY(`session_id`, `context_path`, `virtual_host`),
     KEY `idx_session_expiry` (`expiry_time`)
 ) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Http session store' 
ROW_FORMAT=DYNAMIC;
+
+CREATE TABLE IF NOT EXISTS ha_lease (
+  cluster_name       VARCHAR(64)   NOT NULL COMMENT 'AMS cluster name',
+  service_name       VARCHAR(64)   NOT NULL COMMENT 'Service name 
(AMS/TABLE_SERVICE/OPTIMIZING_SERVICE)',
+  node_id            VARCHAR(256)  NULL COMMENT 'Unique node identifier 
(host:port:uuid)',
+  node_ip            VARCHAR(64)   NULL COMMENT 'Node IP address',
+  server_info_json   TEXT          NULL COMMENT 'JSON encoded server info 
(AmsServerInfo)',
+  lease_expire_ts    BIGINT        NULL COMMENT 'Lease expiration timestamp 
(ms since epoch)',
+  version            INT           NOT NULL DEFAULT 0 COMMENT 'Optimistic lock 
version of the lease row',
+  updated_at         BIGINT        NOT NULL COMMENT 'Last update timestamp (ms 
since epoch)',
+  PRIMARY KEY (cluster_name, service_name),
+  KEY `idx_ha_lease_expire` (lease_expire_ts) COMMENT 'Index for querying 
expired leases',
+  KEY `idx_ha_lease_node` (node_id) COMMENT 'Index for querying leases by node 
ID'
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='HA lease table for leader 
election and heartbeat renewal';
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql 
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index 190b7d04a..8f13d6862 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -439,4 +439,27 @@ COMMENT ON COLUMN http_session.last_save_time IS 'Last 
save time';
 COMMENT ON COLUMN http_session.expiry_time IS 'Expiry time';
 COMMENT ON COLUMN http_session.max_interval IS 'Max internal';
 COMMENT ON COLUMN http_session.data_store IS 'Session data store';
-COMMENT ON TABLE http_session IS 'Http session store';
\ No newline at end of file
+COMMENT ON TABLE http_session IS 'Http session store';
+
+CREATE TABLE IF NOT EXISTS ha_lease (
+  cluster_name       VARCHAR(64)  NOT NULL,
+  service_name       VARCHAR(64)  NOT NULL,
+  node_id            VARCHAR(256) NULL,
+  node_ip            VARCHAR(64)  NULL,
+  server_info_json   TEXT         NULL,
+  lease_expire_ts    BIGINT       NULL,
+  version            INT          NOT NULL DEFAULT 0,
+  updated_at         BIGINT       NOT NULL,
+  PRIMARY KEY (cluster_name, service_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_ha_lease_expire ON ha_lease (lease_expire_ts);
+CREATE INDEX IF NOT EXISTS idx_ha_lease_node   ON ha_lease (node_id);
+
+COMMENT ON COLUMN service_name IS 'Service name 
(AMS/TABLE_SERVICE/OPTIMIZING_SERVICE)';
+COMMENT ON COLUMN node_id IS 'Unique node identifier (host:port:uuid)';
+COMMENT ON COLUMN node_ip IS 'Node IP address';
+COMMENT ON COLUMN server_info_json IS 'JSON encoded server info 
(AmsServerInfo)';
+COMMENT ON COLUMN lease_expire_ts IS 'Lease expiration timestamp (ms since 
epoch)';
+COMMENT ON COLUMN version IS 'Optimistic lock version of the lease row';
+COMMENT ON COLUMN updated_at IS 'Last update timestamp (ms since epoch)';
\ No newline at end of file
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
index d29a1a9ac..92f206d72 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
 import org.apache.amoro.server.util.KerberizedTestHelper;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.Assertions;
@@ -49,7 +50,7 @@ public class HighAvailabilityContainerTest extends 
KerberizedTestHelper {
             conf.set(AmoroManagementConf.HA_ZOOKEEPER_AUTH_PRINCIPAL, 
principal);
             conf.set(AmoroManagementConf.HA_ZOOKEEPER_AUTH_TYPE, "KERBEROS");
 
-            HighAvailabilityContainer.setupZookeeperAuth(conf);
+            ZkHighAvailabilityContainer.setupZookeeperAuth(conf);
             Configuration configuration = Configuration.getConfiguration();
             AppConfigurationEntry[] entries =
                 configuration.getAppConfigurationEntry("AmoroZooKeeperClient");
@@ -66,7 +67,7 @@ public class HighAvailabilityContainerTest extends 
KerberizedTestHelper {
             conf.set(AmoroManagementConf.HA_ZOOKEEPER_AUTH_KEYTAB, 
keytab.getName());
             IOException e =
                 assertThrows(
-                    IOException.class, () -> 
HighAvailabilityContainer.setupZookeeperAuth(conf));
+                    IOException.class, () -> 
ZkHighAvailabilityContainer.setupZookeeperAuth(conf));
             Assertions.assertTrue(e.getMessage().contains("does not exist"));
           } catch (IOException e) {
             throw new RuntimeException(e);
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainerIT.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainerIT.java
new file mode 100644
index 000000000..0378ffebe
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainerIT.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.DataSourceFactory;
+import org.apache.amoro.server.table.DerbyPersistence;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Derby-based integration tests for the DATABASE HA container: leader 
election, failover on crash,
+ * non-preemptive recovery, and demotion on heartbeat loss. Calls
+ * org.apache.amoro.server.ha.DataBaseHighAvailabilityContainer via reflection
+ * (waitLeaderShip/waitFollowerShip/close). If the class is absent on this 
branch (PR not merged),
+ * safely skip all tests in @BeforeClass.
+ */
+public class DataBaseHighAvailabilityContainerIT {
+
+  @ClassRule public static ExternalResource derby = new DerbyPersistence();
+
+  private static boolean databaseHAAvailable;
+
+  @BeforeClass
+  public static void checkDatabaseHAExists() {
+    databaseHAAvailable =
+        
classExists("org.apache.amoro.server.ha.DataBaseHighAvailabilityContainer");
+    Assume.assumeTrue(
+        "Skip DATABASE HA container tests because class not found (PR not 
merged)",
+        databaseHAAvailable);
+    // If ha_lease is not present in the Derby init scripts, skip to avoid 
false positives.
+    Assume.assumeTrue("ha_lease table not present", 
checkHaLeaseTablePresent());
+  }
+
+  @After
+  public void afterEach() {
+    // DerbyPersistence.after() automatically truncates all tables; ensure 
containers are closed.
+  }
+
+  @Test
+  public void testNormalElectionSingleLeaderAcrossTwoNodes() throws Exception {
+    Configurations conf1 =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+    Configurations conf2 =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+
+    HighAvailabilityContainer c1 = 
HighAvailabilityContainerFactory.create(conf1);
+    HighAvailabilityContainer c2 = 
HighAvailabilityContainerFactory.create(conf2);
+
+    ExecutorService es = Executors.newFixedThreadPool(2);
+    Future<?> f1 =
+        es.submit(
+            () -> {
+              try {
+                c1.waitLeaderShip();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+    Future<?> f2 =
+        es.submit(
+            () -> {
+              try {
+                c2.waitLeaderShip();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+
+    boolean c1BecameLeader = false;
+    boolean c2BecameLeader = false;
+    try {
+      Object done = awaitAny(f1, f2, Duration.ofSeconds(15));
+      if (done == f1) {
+        c1BecameLeader = true;
+      } else if (done == f2) {
+        c2BecameLeader = true;
+      }
+      Assert.assertTrue("Exactly one node should become leader", 
c1BecameLeader ^ c2BecameLeader);
+
+      // Assert a unique leader row in ha_lease.
+      assertUniqueLeaderRow();
+    } finally {
+      // End the other waiting task.
+      try {
+        c1.close();
+      } catch (Throwable ignored) {
+      }
+      try {
+        c2.close();
+      } catch (Throwable ignored) {
+      }
+      f1.cancel(true);
+      f2.cancel(true);
+      es.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testFailoverWhenLeaderCrashed() throws Exception {
+    Configurations conf1 =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+    Configurations conf2 =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+
+    HighAvailabilityContainer leader = 
HighAvailabilityContainerFactory.create(conf1);
+
+    // Let the first node become leader.
+    leader.waitLeaderShip();
+    assertUniqueLeaderRow();
+
+    HighAvailabilityContainer follower = 
HighAvailabilityContainerFactory.create(conf2);
+    Thread.sleep(7000); // TTL(6s) + Δ
+
+    // Simulate crash by stopping the leader heartbeat.
+    leader.close();
+
+    // After TTL, the other node should take over as leader.
+    Thread.sleep(7000); // TTL(6s) + Δ
+    follower.waitLeaderShip();
+    assertUniqueLeaderRow();
+
+    // Cleanup.
+    follower.close();
+  }
+
+  @Test
+  public void testLeaderRecoveryNoPreempt() throws Exception {
+    Configurations confLeader =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+    Configurations confFollower =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+
+    HighAvailabilityContainer leader = 
HighAvailabilityContainerFactory.create(confLeader);
+
+    // Make the first node leader, then close it to trigger failover.
+    leader.waitLeaderShip();
+    assertUniqueLeaderRow();
+    HighAvailabilityContainer follower = 
HighAvailabilityContainerFactory.create(confFollower);
+    Thread.sleep(7000);
+
+    leader.close();
+
+    Thread.sleep(7000);
+    follower.waitLeaderShip();
+    assertUniqueLeaderRow();
+
+    // Original leader recovers (create a new container as recovery node).
+    Configurations confRecovery =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+    HighAvailabilityContainer recovery = 
HighAvailabilityContainerFactory.create(confRecovery);
+
+    // Recovery should be non-preemptive while the current leader is healthy; 
wait briefly to
+    // confirm it does not acquire leadership.
+    ExecutorService es = Executors.newSingleThreadExecutor();
+    Future<?> future =
+        es.submit(
+            () -> {
+              try {
+                recovery.waitLeaderShip();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+    try {
+      try {
+        future.get(3, TimeUnit.SECONDS);
+        Assert.fail("Recovery node should not acquire leadership while current 
leader is healthy");
+      } catch (TimeoutException expected) {
+        // Expected: Timeout means it did not become leader.
+      }
+    } finally {
+      // Stop the current leader, wait TTL again; the recovery node should 
become leader in the next
+      // cycle.
+      follower.close();
+      Thread.sleep(7000);
+      recovery.waitLeaderShip();
+      assertUniqueLeaderRow();
+      recovery.close();
+      es.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testHeartbeatLossTriggersDemotion() throws Exception {
+    // Set heartbeat > TTL; the next renew will time out and trigger demotion.
+    Configurations conf =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            7000,
+            3000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+
+    HighAvailabilityContainer container = 
HighAvailabilityContainerFactory.create(conf);
+
+    container.waitLeaderShip();
+    assertUniqueLeaderRow();
+
+    // Wait beyond TTL; the container should detect renew failure and trigger 
demotion.
+    ExecutorService es = Executors.newSingleThreadExecutor();
+    Future<?> f =
+        es.submit(
+            () -> {
+              try {
+                container.waitFollowerShip();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+    try {
+      f.get(10, TimeUnit.SECONDS); // Wait for demotion.
+    } finally {
+      container.close();
+      es.shutdownNow();
+    }
+  }
+
+  // ------------------------- helpers -------------------------
+
+  private static boolean classExists(String name) {
+    try {
+      Class.forName(name);
+      return true;
+    } catch (ClassNotFoundException e) {
+      return false;
+    }
+  }
+
+  public static boolean checkHaLeaseTablePresent() {
+    Configurations conf =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+    try {
+      DataSource ds = DataSourceFactory.createDataSource(conf);
+      try (Connection conn = ds.getConnection();
+          Statement st = conn.createStatement()) {
+        try (ResultSet rs =
+            st.executeQuery("SELECT 1 FROM SYS.SYSTABLES WHERE 
UPPER(TABLENAME) = 'HA_LEASE'")) {
+          return rs.next();
+        }
+      }
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  private static Object awaitAny(Future<?> f1, Future<?> f2, Duration timeout) 
throws Exception {
+    long deadline = System.nanoTime() + timeout.toNanos();
+    while (System.nanoTime() < deadline) {
+      if (f1.isDone()) {
+        return f1;
+      }
+      if (f2.isDone()) {
+        return f2;
+      }
+      Thread.sleep(100);
+    }
+    throw new TimeoutException("Leader election did not complete in time");
+  }
+
+  private static void assertUniqueLeaderRow() throws Exception {
+    // Assert using ha_lease: unique leader row (count=1).
+    Configurations conf = new Configurations();
+    conf.set(
+        AmoroManagementConf.DB_CONNECTION_URL, 
HATestConfigs.detectDerbyUrlFromSqlSessionFactory());
+    conf.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
+    conf.set(AmoroManagementConf.DB_DRIVER_CLASS_NAME, 
"org.apache.derby.jdbc.EmbeddedDriver");
+    DataSource ds = DataSourceFactory.createDataSource(conf);
+    try (Connection conn = ds.getConnection();
+        Statement st = conn.createStatement()) {
+      try (ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM HA_LEASE")) {
+        Assert.assertTrue("ha_lease table should have at least one row", 
rs.next());
+        int cnt = rs.getInt(1);
+        Assert.assertTrue("ha_lease should contain exactly one active leader 
row", cnt >= 1);
+      }
+    }
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/DatabaseHaEdgeCasesIT.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DatabaseHaEdgeCasesIT.java
new file mode 100644
index 000000000..1dda86bbf
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DatabaseHaEdgeCasesIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.DataSourceFactory;
+import org.apache.amoro.server.table.DerbyPersistence;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * DATABASE HA concurrency and edge-case tests using Derby. All external 
classes are invoked via
+ * reflection; safely skipped when not merged.
+ */
+public class DatabaseHaEdgeCasesIT {
+
+  @ClassRule public static ExternalResource derby = new DerbyPersistence();
+
+  @BeforeClass
+  public static void checkPrerequisites() {
+    
Assume.assumeTrue(classExists("org.apache.amoro.server.ha.DataBaseHighAvailabilityContainer"));
+    Assume.assumeTrue(
+        "ha_lease table not present",
+        DataBaseHighAvailabilityContainerIT.checkHaLeaseTablePresent());
+  }
+
+  @Test
+  public void testConcurrentAcquireRace() throws Exception {
+    Configurations conf1 =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+    Configurations conf2 =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+
+    HighAvailabilityContainer c1 = 
HighAvailabilityContainerFactory.create(conf1);
+    HighAvailabilityContainer c2 = 
HighAvailabilityContainerFactory.create(conf2);
+    ExecutorService es = Executors.newFixedThreadPool(2);
+    Future<?> f1 =
+        es.submit(
+            () -> {
+              try {
+                c1.waitLeaderShip();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+    Future<?> f2 =
+        es.submit(
+            () -> {
+              try {
+                c2.waitLeaderShip();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+
+    try {
+      // Only one should return; the other blocks.
+      Object done = awaitAny(f1, f2, 60);
+      Assert.assertTrue(done == f1 && !f2.isDone() || done == f2 && 
!f1.isDone());
+
+      assertLeaseRowCount(1, 3);
+    } finally {
+      c1.close();
+      c2.close();
+      es.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testOptimisticLockRenewConflict() throws Exception {
+    Configurations conf =
+        HATestConfigs.buildDataBaseHAConfig(
+            "default",
+            2000,
+            6000,
+            "127.0.0.1",
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort(),
+            HATestConfigs.randomEphemeralPort());
+    HighAvailabilityContainer container = 
HighAvailabilityContainerFactory.create(conf);
+
+    container.waitLeaderShip();
+    // Modify version externally to simulate an optimistic lock conflict.
+    bumpLeaseVersion();
+
+    // Next heartbeat renew should fail and trigger demotion.
+    ExecutorService es = Executors.newSingleThreadExecutor();
+    Future<?> f =
+        es.submit(
+            () -> {
+              try {
+                container.waitFollowerShip();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+    try {
+      f.get(10, TimeUnit.SECONDS);
+    } finally {
+      container.close();
+      es.shutdownNow();
+    }
+  }
+
+  // ------------------------- helpers -------------------------
+
+  private static boolean classExists(String name) {
+    try {
+      Class.forName(name);
+      return true;
+    } catch (ClassNotFoundException e) {
+      return false;
+    }
+  }
+
+  private static Object awaitAny(Future<?> f1, Future<?> f2, int seconds) 
throws Exception {
+    long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds);
+    while (System.nanoTime() < end) {
+      if (f1.isDone()) {
+        return f1;
+      }
+      if (f2.isDone()) {
+        return f2;
+      }
+      Thread.sleep(100);
+    }
+    throw new TimeoutException("concurrent acquire did not finish in time");
+  }
+
+  private static void assertLeaseRowCount(int minExpected, int maxExpected) 
throws Exception {
+    Configurations conf = new Configurations();
+    conf.set(
+        AmoroManagementConf.DB_CONNECTION_URL, 
HATestConfigs.detectDerbyUrlFromSqlSessionFactory());
+    conf.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
+    conf.set(AmoroManagementConf.DB_DRIVER_CLASS_NAME, 
"org.apache.derby.jdbc.EmbeddedDriver");
+    DataSource ds = DataSourceFactory.createDataSource(conf);
+
+    try (Connection conn = ds.getConnection();
+        Statement st = conn.createStatement()) {
+      try (ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM HA_LEASE")) {
+        Assert.assertTrue(rs.next());
+        int cnt = rs.getInt(1);
+        Assert.assertTrue(
+            "ha_lease row count unexpected: " + cnt, cnt >= minExpected && cnt 
<= maxExpected);
+      }
+    }
+  }
+
+  private static void bumpLeaseVersion() throws Exception {
+    Configurations conf = new Configurations();
+    conf.set(
+        AmoroManagementConf.DB_CONNECTION_URL, 
HATestConfigs.detectDerbyUrlFromSqlSessionFactory());
+    conf.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
+    conf.set(AmoroManagementConf.DB_DRIVER_CLASS_NAME, 
"org.apache.derby.jdbc.EmbeddedDriver");
+    DataSource ds = DataSourceFactory.createDataSource(conf);
+    try (Connection conn = ds.getConnection();
+        Statement st = conn.createStatement()) {
+      st.executeUpdate("UPDATE HA_LEASE SET VERSION = VERSION + 1");
+      conn.commit();
+    }
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/HATestConfigs.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/HATestConfigs.java
new file mode 100644
index 000000000..7b7f00ee0
--- /dev/null
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/ha/HATestConfigs.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
+import org.apache.ibatis.session.SqlSession;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Random;
+
+/**
+ * Test helper: Build Configurations with JDBC HA enabled and ensure 
database.url points to the
+ * DerbyPersistence path. Note: The current master branch may not yet include 
ha.type/heartbeat/ttl
+ * keys. We still set them as string keys. After the PR is merged, the HA 
factory will recognize
+ * them. Even if not recognized, tests will still be safely skipped.
+ */
+public final class HATestConfigs {
+  private HATestConfigs() {}
+
+  /**
+   * Build unified DataBase HA configuration for containers/AMS. Use short 
heartbeat and lease TTL
+   * to speed up tests.
+   *
+   * @param heartbeatMillis Heartbeat interval in milliseconds.
+   * @param ttlMillis Lease TTL in milliseconds.
+   * @param exposeHost server-expose-host.
+   * @param tablePort Table Thrift port.
+   * @param optimizingPort Optimizing Thrift port.
+   * @param httpPort HTTP port.
+   * @return Configurations including Derby URL, ha.enabled=true, 
ha.type=database, etc.
+   */
+  public static Configurations buildDataBaseHAConfig(
+      String clusterName,
+      int heartbeatMillis,
+      int ttlMillis,
+      String exposeHost,
+      int tablePort,
+      int optimizingPort,
+      int httpPort) {
+    Configurations conf = new Configurations();
+
+    // HA basic switches.
+    conf.set(AmoroManagementConf.HA_ENABLE, true);
+    // Keys introduced by the PR. The current master may not recognize them; 
that's fine.
+    conf.setString(AmoroManagementConf.HA_TYPE, 
AmoroManagementConf.HA_TYPE_DATABASE);
+    conf.setString(
+        AmoroManagementConf.HA_HEARTBEAT_INTERVAL.key(), 
String.valueOf(heartbeatMillis));
+    conf.setString(AmoroManagementConf.HA_LEASE_TTL.key(), 
String.valueOf(ttlMillis));
+    conf.setString(AmoroManagementConf.HA_CLUSTER_NAME, clusterName);
+
+    // Exposed host and ports.
+    conf.set(AmoroManagementConf.SERVER_EXPOSE_HOST, exposeHost);
+    conf.set(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, tablePort);
+    conf.set(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT, 
optimizingPort);
+    conf.set(AmoroManagementConf.HTTP_SERVER_PORT, httpPort);
+
+    return conf;
+  }
+
+  /**
+   * Detect the JDBC URL used by DerbyPersistence from 
SqlSessionFactoryProvider connection
+   * metadata.
+   */
+  public static String detectDerbyUrlFromSqlSessionFactory() {
+    try (SqlSession session = 
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+      try (Connection conn = session.getConnection()) {
+        return conn.getMetaData().getURL();
+      }
+    } catch (SQLException e) {
+      // Fallback to a default path as a safeguard; does not affect skip 
behavior.
+      return "jdbc:derby:/tmp/amoro/derby;create=true";
+    }
+  }
+
+  /** Generate a random port in the range 14000-18000 to avoid conflicts. */
+  public static int randomEphemeralPort() {
+    return new Random().nextInt(4000) + 14000;
+  }
+}
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index fa763689a..8e2e780d6 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -71,7 +71,10 @@ table td:last-child, table th:last-child { width: 40%; 
word-break: break-all; }
 | ha.cluster-name | default | Amoro management service cluster name. |
 | ha.connection-timeout | 5 min | The Zookeeper connection timeout in 
milliseconds. |
 | ha.enabled | false | Whether to enable high availability mode. |
+| ha.heartbeat-interval | 10 s | HA heartbeat interval. |
+| ha.lease-ttl | 30 s | TTL of HA lease. |
 | ha.session-timeout | 30 s | The Zookeeper session timeout in milliseconds. |
+| ha.type | zk | High availability implementation type: zk or database. |
 | ha.zookeeper-address |  | The Zookeeper address used for high availability. |
 | ha.zookeeper-auth-keytab |  | The Zookeeper authentication keytab file path 
when auth type is KERBEROS. |
 | ha.zookeeper-auth-principal |  | The Zookeeper authentication principal when 
auth type is KERBEROS. |

Reply via email to