This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new e0246aae0 [AMORO-3998]support DATABASE HA SERVICE (#3997)
e0246aae0 is described below
commit e0246aae056d823188f818bc6396fafcc6f2314b
Author: LiangDai-Mars <[email protected]>
AuthorDate: Mon Dec 22 16:49:26 2025 +0800
[AMORO-3998]support DATABASE HA SERVICE (#3997)
support DATABASE HA SERVICE
Co-authored-by: dailiang <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 22 ++
.../amoro/server/AmoroManagementConfValidator.java | 21 +-
.../apache/amoro/server/AmoroServiceContainer.java | 4 +-
.../ha/DataBaseHighAvailabilityContainer.java | 311 +++++++++++++++++
.../amoro/server/ha/HighAvailabilityContainer.java | 45 +++
.../ha/HighAvailabilityContainerFactory.java | 65 ++++
.../server/ha/NoopHighAvailabilityContainer.java | 45 +++
.../ZkHighAvailabilityContainer.java} | 12 +-
.../amoro/server/persistence/HaLeaseMeta.java | 158 +++++++++
.../persistence/SqlSessionFactoryProvider.java | 2 +
.../server/persistence/mapper/HaLeaseMapper.java | 197 +++++++++++
.../src/main/resources/derby/ams-derby-init.sql | 17 +-
.../src/main/resources/mysql/ams-mysql-init.sql | 14 +
.../main/resources/postgres/ams-postgres-init.sql | 25 +-
.../server/HighAvailabilityContainerTest.java | 5 +-
.../ha/DataBaseHighAvailabilityContainerIT.java | 370 +++++++++++++++++++++
.../amoro/server/ha/DatabaseHaEdgeCasesIT.java | 208 ++++++++++++
.../org/apache/amoro/server/ha/HATestConfigs.java | 98 ++++++
docs/configuration/ams-config.md | 3 +
19 files changed, 1609 insertions(+), 13 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 8ba5439bd..579142b2e 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -204,12 +204,24 @@ public class AmoroManagementConf {
.defaultValue(false)
.withDescription("Whether to enable high availability mode.");
+ public static final ConfigOption<String> HA_TYPE =
+ ConfigOptions.key("ha.type")
+ .stringType()
+ .defaultValue(AmoroManagementConf.HA_TYPE_ZK)
+ .withDescription("High availability implementation type: zk or
database.");
+
public static final ConfigOption<String> HA_CLUSTER_NAME =
ConfigOptions.key("ha.cluster-name")
.stringType()
.defaultValue("default")
.withDescription("Amoro management service cluster name.");
+ public static final ConfigOption<java.time.Duration> HA_HEARTBEAT_INTERVAL =
+ ConfigOptions.key("ha.heartbeat-interval")
+ .durationType()
+ .defaultValue(java.time.Duration.ofSeconds(10))
+ .withDescription("HA heartbeat interval.");
+
public static final ConfigOption<String> HA_ZOOKEEPER_ADDRESS =
ConfigOptions.key("ha.zookeeper-address")
.stringType()
@@ -247,6 +259,12 @@ public class AmoroManagementConf {
.defaultValue(Duration.ofSeconds(300))
.withDescription("The Zookeeper connection timeout in
milliseconds.");
+ public static final ConfigOption<java.time.Duration> HA_LEASE_TTL =
+ ConfigOptions.key("ha.lease-ttl")
+ .durationType()
+ .defaultValue(java.time.Duration.ofSeconds(30))
+ .withDescription("TTL of HA lease.");
+
public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
ConfigOptions.key("thrift-server.table-service.bind-port")
.intType()
@@ -536,6 +554,10 @@ public class AmoroManagementConf {
public static final String DB_TYPE_MYSQL = "mysql";
public static final String DB_TYPE_POSTGRES = "postgres";
+ // HA config
+ public static final String HA_TYPE_ZK = "zk";
+ public static final String HA_TYPE_DATABASE = "database";
+
// terminal config
public static final List<String> TERMINAL_BACKEND_VALUES =
Arrays.asList("local", "kyuubi", "custom");
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
index bae28f761..1e78520f0 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
@@ -49,11 +49,24 @@ public class AmoroManagementConfValidator {
// HA config
if (configurations.getBoolean(AmoroManagementConf.HA_ENABLE)) {
- if
("".equals(configurations.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS)))
{
+ String type =
configurations.getString(AmoroManagementConf.HA_TYPE).toLowerCase();
+ if (!AmoroManagementConf.HA_TYPE_ZK.equals(type)
+ && !AmoroManagementConf.HA_TYPE_DATABASE.equals(type)) {
throw new IllegalArgumentException(
- AmoroManagementConf.HA_ZOOKEEPER_ADDRESS.key()
- + " must be configured when you enable "
- + "the ams high availability");
+ String.format("Illegal ha.type: %s, zk or database is available",
type));
+ }
+ if (AmoroManagementConf.HA_TYPE_ZK.equals(type)) {
+ if
("".equals(configurations.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS)))
{
+ throw new IllegalArgumentException(
+ AmoroManagementConf.HA_ZOOKEEPER_ADDRESS.key()
+ + " must be configured when ha.type=zk");
+ }
+ } else {
+ String dbUrl =
configurations.getString(AmoroManagementConf.DB_CONNECTION_URL);
+ if ("".equals(dbUrl)) {
+ throw new IllegalArgumentException(
+ "database.url must be configured when ha.type=database");
+ }
}
}
// terminal config
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 2fd9f7560..18cdf8666 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -36,6 +36,8 @@ import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.HighAvailabilityContainerFactory;
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.persistence.DataSourceFactory;
@@ -115,7 +117,7 @@ public class AmoroServiceContainer {
public AmoroServiceContainer() throws Exception {
initConfig();
- haContainer = new HighAvailabilityContainer(serviceConfig);
+ haContainer = HighAvailabilityContainerFactory.create(serviceConfig);
}
public static void main(String[] args) {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
new file mode 100644
index 000000000..70d1253c6
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.HaLeaseMeta;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * High availability (HA) container backed by a Database lease (RDBMS lock).
+ *
+ * <p>Core mechanics: - fixed-rate heartbeat to acquire/renew the lease -
optimistic row version
+ * plus expiration timestamp - leader election: first successful acquire
becomes the leader; losing
+ * the lease demotes the node - persistence via PersistentBase and
HaLeaseMetaMapper - concurrency
+ * guard using AtomicBoolean flags; once the lease is revoked, leadership is
not re-gained
+ *
+ * <p>Reference to Hudi AnalogousZkClient (for alignment only, no
System.exit): - await/signal:
+ * waitLeaderShip() condition wait and signal on leadership gain - heartbeat:
scheduled
+ * acquire/renew - losing leadership: demote and countDown follower latch, no
System.exit
+ */
+public class DataBaseHighAvailabilityContainer extends PersistentBase
+ implements HighAvailabilityContainer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataBaseHighAvailabilityContainer.class);
+ private static final String AMS_SERVICE = "AMS";
+ private static final String TABLE_SERVICE = "TABLE_SERVICE";
+ private static final String OPTIMIZING_SERVICE = "OPTIMIZING_SERVICE";
+
+ private final Configurations serviceConfig;
+ private final ScheduledExecutorService executor;
+ private final AtomicBoolean isLeader = new AtomicBoolean(false);
+ /** Prevent re-gaining leadership once the lease is lost. */
+ private final AtomicBoolean leadershipRevoked = new AtomicBoolean(false);
+
+ private volatile CountDownLatch followerLatch;
+
+ private final Lock leaderLock = new ReentrantLock();
+ private final Condition leaderCondition = leaderLock.newCondition();
+
+ private final String clusterName;
+ private final long heartbeatIntervalSeconds;
+ private final long ttlSeconds;
+
+ private final String nodeId;
+ private final String nodeIp;
+ private final AmsServerInfo tableServiceServerInfo;
+ private final AmsServerInfo optimizingServiceServerInfo;
+
+ /** Local lease version (optimistic lock). Null means not acquired yet. */
+ private volatile Integer leaseVersion = null;
+
+ public DataBaseHighAvailabilityContainer(Configurations serviceConfig) {
+ this.serviceConfig = serviceConfig;
+ this.clusterName =
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+ // Read heartbeat interval and lease TTL from common settings (Duration),
expressed in seconds
+ this.heartbeatIntervalSeconds =
+
serviceConfig.get(AmoroManagementConf.HA_HEARTBEAT_INTERVAL).getSeconds();
+ this.ttlSeconds =
serviceConfig.get(AmoroManagementConf.HA_LEASE_TTL).getSeconds();
+
+ this.nodeIp =
serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST);
+ int tableThriftPort =
+
serviceConfig.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT);
+ int optimizingThriftPort =
+
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT);
+ int restPort =
serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT);
+
+ // Build a unique node ID (IP + optimizing port + random suffix) to
distinguish multi-instance
+ // on the same host
+ this.nodeId = nodeIp + ":" + optimizingThriftPort + ":" +
UUID.randomUUID();
+
+ this.tableServiceServerInfo = buildServerInfo(nodeIp, tableThriftPort,
restPort);
+ this.optimizingServiceServerInfo = buildServerInfo(nodeIp,
optimizingThriftPort, restPort);
+
+ this.executor = Executors.newSingleThreadScheduledExecutor();
+ // Heartbeat similar to AnalogousZkClient: fixed-rate renewal with initial
acquire attempt
+ this.executor.scheduleAtFixedRate(
+ new HeartbeatRunnable(), 5, Math.max(1, heartbeatIntervalSeconds),
TimeUnit.SECONDS);
+ }
+
+ /** Blocks until leadership is acquired and server info is written for both
services. */
+ @Override
+ public void waitLeaderShip() throws InterruptedException {
+ LOG.info("Waiting to become the leader of AMS (Database lease)");
+ leaderLock.lock();
+ try {
+ while (!isLeader.get()) {
+ leaderCondition.await();
+ }
+ } finally {
+ leaderLock.unlock();
+ }
+ LOG.info("Became the leader of AMS (Database lease)");
+ }
+
+ /** Blocks until the follower latch is counted down (leadership lost or
demoted). */
+ @Override
+ public void waitFollowerShip() throws InterruptedException {
+ LOG.info("Waiting to become the follower of AMS (Database lease)");
+ if (followerLatch != null) {
+ followerLatch.await();
+ }
+ LOG.info("Became the follower of AMS (Database lease)");
+ }
+
+ /** Closes the heartbeat executor safely. */
+ @Override
+ public void close() {
+ try {
+ if (executor != null && !executor.isShutdown()) {
+ executor.shutdown();
+ }
+ } catch (Exception e) {
+ LOG.error("Close Database HighAvailabilityContainer failed", e);
+ }
+ }
+
+ private class HeartbeatRunnable implements Runnable {
+ @Override
+ public void run() {
+ try {
+ long now = System.currentTimeMillis();
+ long newExpireTs = now + TimeUnit.SECONDS.toMillis(ttlSeconds);
+
+ if (leadershipRevoked.get()) {
+ // Lease already revoked; skip any subsequent leadership path
+ return;
+ }
+
+ if (!isLeader.get()) {
+ // First attempt to acquire the lease (similar to candidate/await)
+ boolean success = tryAcquireLease(newExpireTs, now);
+ LOG.info(
+ "Try to acquire AMS lease: success={}, cluster={}, nodeId={},
nodeIp={}",
+ success,
+ clusterName,
+ nodeId,
+ nodeIp);
+ if (success) {
+ // On leadership gained: signal waiters and write server info for
both services
+ onLeaderGained(now);
+ }
+ } else {
+ // Leader renew (similar to renew lease)
+ boolean renewed = tryRenewLease(newExpireTs, now);
+ LOG.info(
+ "Try to renew AMS lease: success={}, cluster={}, nodeId={}",
+ renewed,
+ clusterName,
+ nodeId);
+ if (!renewed && isLeader.get()) {
+ // Leadership lost: demote; no System.exit
+ onLeaderLost();
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Database HA heartbeat failed", t);
+ }
+ }
+ }
+
+ private boolean tryAcquireLease(long newExpireTs, long now) {
+ // Insert lease row if absent (idempotent) to ensure existence
+ doAsIgnoreError(
+ HaLeaseMapper.class,
+ mapper ->
+ mapper.insertIfAbsent(
+ new HaLeaseMeta(
+ clusterName, AMS_SERVICE, nodeId, nodeIp, "", newExpireTs,
0, now)));
+
+ // Acquire when the lease is expired or free
+ int affected =
+ updateAs(
+ HaLeaseMapper.class,
+ mapper ->
+ mapper.acquireLease(
+ clusterName,
+ AMS_SERVICE,
+ nodeId,
+ nodeIp,
+ JacksonUtil.toJSONString(tableServiceServerInfo),
+ newExpireTs,
+ now))
+ .intValue();
+ if (affected == 1) {
+ // Read current version
+ HaLeaseMeta lease =
+ getAs(HaLeaseMapper.class, mapper -> mapper.selectLease(clusterName,
AMS_SERVICE));
+ leaseVersion = lease != null ? lease.getVersion() : 0;
+ return true;
+ }
+ return false;
+ }
+
+ private boolean tryRenewLease(long newExpireTs, long now) {
+ if (leaseVersion == null) {
+ HaLeaseMeta lease =
+ getAs(HaLeaseMapper.class, mapper -> mapper.selectLease(clusterName,
AMS_SERVICE));
+ leaseVersion = lease != null ? lease.getVersion() : 0;
+ }
+ int affected =
+ updateAs(
+ HaLeaseMapper.class,
+ mapper ->
+ mapper.renewLease(
+ clusterName, AMS_SERVICE, nodeId, leaseVersion,
newExpireTs, now))
+ .intValue();
+ if (affected == 1) {
+ leaseVersion = leaseVersion + 1;
+ return true;
+ }
+ return false;
+ }
+
+ private void onLeaderGained(long now) {
+ isLeader.set(true);
+ followerLatch = new CountDownLatch(1);
+
+ // Equivalent to AnalogousZkClient.signalAll()
+ leaderLock.lock();
+ try {
+ leaderCondition.signalAll();
+ } finally {
+ leaderLock.unlock();
+ }
+
+ // Persist server info for TABLE_SERVICE and OPTIMIZING_SERVICE on
leadership
+ String tableInfoJson = JacksonUtil.toJSONString(tableServiceServerInfo);
+ String optimizingInfoJson =
JacksonUtil.toJSONString(optimizingServiceServerInfo);
+
+ doAsIgnoreError(
+ HaLeaseMapper.class,
+ mapper -> {
+ int u1 =
+ mapper.updateServerInfo(
+ clusterName, TABLE_SERVICE, nodeId, nodeIp, tableInfoJson,
now);
+ if (u1 == 0) {
+ mapper.insertServerInfoIfAbsent(
+ clusterName, TABLE_SERVICE, nodeId, nodeIp, tableInfoJson,
now);
+ }
+ });
+ doAsIgnoreError(
+ HaLeaseMapper.class,
+ mapper -> {
+ int u2 =
+ mapper.updateServerInfo(
+ clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp,
optimizingInfoJson, now);
+ if (u2 == 0) {
+ mapper.insertServerInfoIfAbsent(
+ clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp,
optimizingInfoJson, now);
+ }
+ });
+
+ LOG.info(
+ "Current node become leader, tableService={}, optimizingService={}",
+ tableServiceServerInfo,
+ optimizingServiceServerInfo);
+ }
+
+ private void onLeaderLost() {
+ LOG.info(
+ "Current node as leader is disconnected or lost the lease, cluster={},
nodeId={}",
+ clusterName,
+ nodeId);
+ leadershipRevoked.set(true);
+ isLeader.set(false);
+ if (followerLatch != null) {
+ followerLatch.countDown();
+ }
+ }
+
+ private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int
restBindPort) {
+ AmsServerInfo amsServerInfo = new AmsServerInfo();
+ amsServerInfo.setHost(host);
+ amsServerInfo.setRestBindPort(restBindPort);
+ amsServerInfo.setThriftBindPort(thriftBindPort);
+ return amsServerInfo;
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
new file mode 100644
index 000000000..15a005e1f
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+/**
+ * Common interface for high availability (HA) containers.
+ *
+ * <p>Provides a unified contract for AMS HA implementations (ZK/DATABASE) to
wait for leadership,
+ * wait for follower state, and close resources.
+ */
+public interface HighAvailabilityContainer {
+
+ /**
+ * Blocks until this node gains leadership.
+ *
+ * @throws Exception if waiting fails or the underlying implementation
throws an error
+ */
+ void waitLeaderShip() throws Exception;
+
+ /**
+ * Blocks until this node becomes a follower (for example, when leadership
is lost).
+ *
+ * @throws Exception if waiting fails or the underlying implementation
throws an error
+ */
+ void waitFollowerShip() throws Exception;
+
+ /** Closes the container and releases resources. */
+ void close();
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainerFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainerFactory.java
new file mode 100644
index 000000000..2580b315e
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating HA containers based on configuration.
+ *
+ * <p>Supports "zk" and "database". When ha.enabled=false, returns a no-op
container.
+ */
+public final class HighAvailabilityContainerFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(HighAvailabilityContainerFactory.class);
+
+ private HighAvailabilityContainerFactory() {}
+
+ /**
+ * Creates an HA container based on the given configuration.
+ *
+ * @param conf service configuration
+ * @return a HA container implementation according to ha.enabled and ha.type
+ * @throws IllegalArgumentException if ha.type is unsupported
+ * @throws RuntimeException if the ZK container cannot be created
+ */
+ public static HighAvailabilityContainer create(Configurations conf) {
+ boolean enabled = conf.getBoolean(AmoroManagementConf.HA_ENABLE);
+ if (!enabled) {
+ LOG.info("HA is disabled, use NoopHighAvailabilityContainer");
+ return new NoopHighAvailabilityContainer();
+ }
+ String type = conf.getString(AmoroManagementConf.HA_TYPE).toLowerCase();
+ switch (type) {
+ case AmoroManagementConf.HA_TYPE_ZK:
+ try {
+ return new ZkHighAvailabilityContainer(conf);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create
ZkHighAvailabilityContainer", e);
+ }
+ case AmoroManagementConf.HA_TYPE_DATABASE:
+ return new DataBaseHighAvailabilityContainer(conf);
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported ha.type: " + type + ", only 'zk' or 'database' are
allowed");
+ }
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
new file mode 100644
index 000000000..e8bef7d9c
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** No-op HA container that never blocks and performs no leader election. */
+public class NoopHighAvailabilityContainer implements
HighAvailabilityContainer {
+ private static final Logger LOG =
LoggerFactory.getLogger(NoopHighAvailabilityContainer.class);
+
+ @Override
+ /** Returns immediately without blocking. */
+ public void waitLeaderShip() {
+ LOG.info("Noop HA: waitLeaderShip returns immediately");
+ }
+
+ @Override
+ /** Returns immediately without blocking. */
+ public void waitFollowerShip() {
+ LOG.info("Noop HA: waitFollowerShip returns immediately");
+ }
+
+ @Override
+ /** No-op close operation. */
+ public void close() {
+ LOG.info("Noop HA: close");
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
rename to
amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index 7f74d3924..b168eff55 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -16,11 +16,12 @@
* limitations under the License.
*/
-package org.apache.amoro.server;
+package org.apache.amoro.server.ha;
import org.apache.amoro.client.AmsServerInfo;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
@@ -48,9 +49,9 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-public class HighAvailabilityContainer implements LeaderLatchListener {
+public class ZkHighAvailabilityContainer implements HighAvailabilityContainer,
LeaderLatchListener {
- public static final Logger LOG =
LoggerFactory.getLogger(HighAvailabilityContainer.class);
+ public static final Logger LOG =
LoggerFactory.getLogger(ZkHighAvailabilityContainer.class);
private final LeaderLatch leaderLatch;
private final CuratorFramework zkClient;
@@ -60,7 +61,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
private final AmsServerInfo optimizingServiceServerInfo;
private volatile CountDownLatch followerLatch;
- public HighAvailabilityContainer(Configurations serviceConfig) throws
Exception {
+ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws
Exception {
if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
String zkServerAddress =
serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS);
int zkSessionTimeout =
@@ -109,6 +110,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
}
}
+ @Override
public void waitLeaderShip() throws Exception {
LOG.info("Waiting to become the leader of AMS");
if (leaderLatch != null) {
@@ -138,6 +140,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
LOG.info("Became the leader of AMS");
}
+ @Override
public void waitFollowerShip() throws Exception {
LOG.info("Waiting to become the follower of AMS");
if (followerLatch != null) {
@@ -146,6 +149,7 @@ public class HighAvailabilityContainer implements
LeaderLatchListener {
LOG.info("Became the follower of AMS");
}
+ @Override
public void close() {
if (leaderLatch != null) {
try {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HaLeaseMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HaLeaseMeta.java
new file mode 100644
index 000000000..510fd0c31
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HaLeaseMeta.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence;
+
+import java.util.Objects;
+
+/**
+ * Entity representing an HA lease stored in the database.
+ *
+ * <p>Fields: - cluster_name: AMS cluster name - service_name: service name
(AMS, TABLE_SERVICE,
+ * OPTIMIZING_SERVICE) - node_id: unique node identifier - node_ip: node IP
address -
+ * server_info_json: serialized AmsServerInfo - lease_expire_ts: lease
expiration timestamp
+ * (milliseconds) - version: row version used for optimistic locking -
updated_at: last update
+ * timestamp (milliseconds)
+ */
+public class HaLeaseMeta {
+ private String clusterName;
+ private String serviceName;
+ private String nodeId;
+ private String nodeIp;
+ private String serverInfoJson;
+ private Long leaseExpireTs;
+ private Integer version;
+ private Long updatedAt;
+
+ public HaLeaseMeta() {}
+
+ public HaLeaseMeta(
+ String clusterName,
+ String serviceName,
+ String nodeId,
+ String nodeIp,
+ String serverInfoJson,
+ Long leaseExpireTs,
+ Integer version,
+ Long updatedAt) {
+ this.clusterName = clusterName;
+ this.serviceName = serviceName;
+ this.nodeId = nodeId;
+ this.nodeIp = nodeIp;
+ this.serverInfoJson = serverInfoJson;
+ this.leaseExpireTs = leaseExpireTs;
+ this.version = version;
+ this.updatedAt = updatedAt;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getNodeIp() {
+ return nodeIp;
+ }
+
+ public void setNodeIp(String nodeIp) {
+ this.nodeIp = nodeIp;
+ }
+
+ public String getServerInfoJson() {
+ return serverInfoJson;
+ }
+
+ public void setServerInfoJson(String serverInfoJson) {
+ this.serverInfoJson = serverInfoJson;
+ }
+
+ public Long getLeaseExpireTs() {
+ return leaseExpireTs;
+ }
+
+ public void setLeaseExpireTs(Long leaseExpireTs) {
+ this.leaseExpireTs = leaseExpireTs;
+ }
+
+ public Integer getVersion() {
+ return version;
+ }
+
+ public void setVersion(Integer version) {
+ this.version = version;
+ }
+
+ public Long getUpdatedAt() {
+ return updatedAt;
+ }
+
+ public void setUpdatedAt(Long updatedAt) {
+ this.updatedAt = updatedAt;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ HaLeaseMeta haLeaseMeta = (HaLeaseMeta) other;
+ return Objects.equals(clusterName, haLeaseMeta.clusterName)
+ && Objects.equals(serviceName, haLeaseMeta.serviceName)
+ && Objects.equals(nodeId, haLeaseMeta.nodeId)
+ && Objects.equals(nodeIp, haLeaseMeta.nodeIp)
+ && Objects.equals(serverInfoJson, haLeaseMeta.serverInfoJson)
+ && Objects.equals(leaseExpireTs, haLeaseMeta.leaseExpireTs)
+ && Objects.equals(version, haLeaseMeta.version)
+ && Objects.equals(updatedAt, haLeaseMeta.updatedAt);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ clusterName,
+ serviceName,
+ nodeId,
+ nodeIp,
+ serverInfoJson,
+ leaseExpireTs,
+ version,
+ updatedAt);
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 49eb9f5db..3086bac5e 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -26,6 +26,7 @@ import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.PlatformFileMapper;
@@ -74,6 +75,7 @@ public class SqlSessionFactoryProvider {
configuration.addMapper(TableBlockerMapper.class);
configuration.addMapper(TableProcessMapper.class);
configuration.addMapper(TableRuntimeMapper.class);
+ configuration.addMapper(HaLeaseMapper.class);
PageInterceptor interceptor = new PageInterceptor();
Properties interceptorProperties = new Properties();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
new file mode 100644
index 000000000..c3ce95d74
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.mapper;
+
+import org.apache.amoro.server.persistence.HaLeaseMeta;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.List;
+
+/**
+ * MyBatis mapper for HA lease table. Provides concrete SQL in annotations.
+ *
+ * <p>Notes: - insertIfAbsent: plain INSERT, duplicate key errors are ignored
by caller via
+ * doAsIgnoreError. - acquireLease: try to acquire leadership when expired or
empty. - renewLease:
+ * optimistic lock renewal by version. - selectLease: map a single row to
{@link HaLeaseMeta}. -
+ * releaseLease: mark lease as expired. - upsertServerInfo: MySQL-style INSERT
... ON DUPLICATE KEY
+ * UPDATE for server info.
+ */
+public interface HaLeaseMapper {
+
+ /**
+ * Insert a lease row. If the primary key already exists, the caller may
ignore the exception.
+ *
+ * @param lease lease entity to insert
+ * @return affected rows
+ */
+ @Insert(
+ "INSERT INTO ha_lease "
+ + "(cluster_name, service_name, node_id, node_ip, server_info_json,
lease_expire_ts, version, updated_at) "
+ + "VALUES (#{lease.clusterName}, #{lease.serviceName},
#{lease.nodeId}, #{lease.nodeIp}, "
+ + "#{lease.serverInfoJson}, #{lease.leaseExpireTs},
#{lease.version}, #{lease.updatedAt})")
+ int insertIfAbsent(@Param("lease") HaLeaseMeta lease);
+
+ /**
+ * Try to acquire leadership by updating the lease only when it is expired
or empty. Increments
+ * version and sets new lease_expire_ts.
+ *
+ * @return affected rows (1 means acquired)
+ */
+ @Update(
+ "UPDATE ha_lease "
+ + "SET node_id = #{nodeId}, node_ip = #{nodeIp}, server_info_json =
#{serverInfoJson}, "
+ + "lease_expire_ts = #{leaseExpireTs}, version = version + 1,
updated_at = #{updatedAt} "
+ + "WHERE cluster_name = #{clusterName} AND service_name =
#{serviceName} "
+ + "AND (lease_expire_ts IS NULL OR lease_expire_ts < #{updatedAt})")
+ int acquireLease(
+ @Param("clusterName") String clusterName,
+ @Param("serviceName") String serviceName,
+ @Param("nodeId") String nodeId,
+ @Param("nodeIp") String nodeIp,
+ @Param("serverInfoJson") String serverInfoJson,
+ @Param("leaseExpireTs") Long leaseExpireTs,
+ @Param("updatedAt") Long updatedAt);
+
+ /**
+ * Renew the lease using optimistic concurrency control. Only succeeds if
the expected version
+ * matches and the lease is not expired.
+ *
+ * @return affected rows (1 means renewed)
+ */
+ @Update(
+ "UPDATE ha_lease "
+ + "SET lease_expire_ts = #{newLeaseExpireTs}, version = version + 1,
updated_at = #{updatedAt} "
+ + "WHERE cluster_name = #{clusterName} AND service_name =
#{serviceName} "
+ + "AND node_id = #{nodeId} AND version = #{expectedVersion} AND
lease_expire_ts > #{updatedAt}")
+ int renewLease(
+ @Param("clusterName") String clusterName,
+ @Param("serviceName") String serviceName,
+ @Param("nodeId") String nodeId,
+ @Param("expectedVersion") Integer expectedVersion,
+ @Param("newLeaseExpireTs") Long newLeaseExpireTs,
+ @Param("updatedAt") Long updatedAt);
+
+ /**
+ * Select current lease for cluster and service.
+ *
+ * @param clusterName cluster name
+ * @param serviceName service name
+ * @return lease row or null
+ */
+ @Select(
+ "SELECT cluster_name, service_name, node_id, node_ip, server_info_json,
lease_expire_ts, version, updated_at "
+ + "FROM ha_lease WHERE cluster_name = #{clusterName} AND
service_name = #{serviceName}")
+ @ResultMap("HaLeaseMetaMap")
+ HaLeaseMeta selectLease(
+ @Param("clusterName") String clusterName, @Param("serviceName") String
serviceName);
+
+ /**
+ * Select current lease for cluster and service.
+ *
+ * @param clusterName cluster name
+ * @param serviceName service name
+ * @return lease row or null
+ */
+ @Select(
+ "SELECT cluster_name, service_name, node_id, node_ip, server_info_json,
lease_expire_ts, version, updated_at "
+ + "FROM ha_lease")
+ @Results(
+ id = "HaLeaseMetaMap",
+ value = {
+ @Result(column = "cluster_name", property = "clusterName"),
+ @Result(column = "service_name", property = "serviceName"),
+ @Result(column = "node_id", property = "nodeId"),
+ @Result(column = "node_ip", property = "nodeIp"),
+ @Result(column = "server_info_json", property = "serverInfoJson"),
+ @Result(column = "lease_expire_ts", property = "leaseExpireTs"),
+ @Result(column = "version", property = "version"),
+ @Result(column = "updated_at", property = "updatedAt")
+ })
+ List<HaLeaseMeta> selectAllLease();
+
+ /**
+ * Release the lease by marking it expired at updatedAt.
+ *
+ * @return affected rows
+ */
+ @Update(
+ "UPDATE ha_lease SET lease_expire_ts = #{updatedAt}, updated_at =
#{updatedAt} "
+ + "WHERE cluster_name = #{clusterName} AND service_name =
#{serviceName} AND node_id = #{nodeId}")
+ int releaseLease(
+ @Param("clusterName") String clusterName,
+ @Param("serviceName") String serviceName,
+ @Param("nodeId") String nodeId,
+ @Param("updatedAt") Long updatedAt);
+
+ /**
+ * Update server info if the row exists (DB-neutral upsert step).
+ *
+ * @return affected rows (1 if updated, 0 if not found)
+ */
+ @Update(
+ "UPDATE ha_lease SET node_id=#{nodeId}, node_ip=#{nodeIp},
server_info_json=#{serverInfoJson}, updated_at=#{updatedAt} "
+ + "WHERE cluster_name=#{clusterName} AND
service_name=#{serviceName}")
+ int updateServerInfo(
+ @Param("clusterName") String clusterName,
+ @Param("serviceName") String serviceName,
+ @Param("nodeId") String nodeId,
+ @Param("nodeIp") String nodeIp,
+ @Param("serverInfoJson") String serverInfoJson,
+ @Param("updatedAt") Long updatedAt);
+
+ /**
+ * Insert server info if absent (DB-neutral upsert step). Sets version=0 for
non-leader entries.
+ */
+ @Insert(
+ "INSERT INTO ha_lease (cluster_name, service_name, node_id, node_ip,
server_info_json, version, updated_at) "
+ + "VALUES (#{clusterName}, #{serviceName}, #{nodeId}, #{nodeIp},
#{serverInfoJson}, 0, #{updatedAt})")
+ int insertServerInfoIfAbsent(
+ @Param("clusterName") String clusterName,
+ @Param("serviceName") String serviceName,
+ @Param("nodeId") String nodeId,
+ @Param("nodeIp") String nodeIp,
+ @Param("serverInfoJson") String serverInfoJson,
+ @Param("updatedAt") Long updatedAt);
+
+ /**
+ * Upsert server info for non-leader services using MySQL syntax. For
Postgres, use: INSERT ... ON
+ * CONFLICT (cluster_name, service_name) DO UPDATE SET ...
+ *
+ * @return affected rows
+ */
+ @Insert(
+ "INSERT INTO ha_lease (cluster_name, service_name, node_id, node_ip,
server_info_json, updated_at) "
+ + "VALUES (#{clusterName}, #{serviceName}, #{nodeId}, #{nodeIp},
#{serverInfoJson}, #{updatedAt}) "
+ + "ON DUPLICATE KEY UPDATE "
+ + "node_id = VALUES(node_id), node_ip = VALUES(node_ip), "
+ + "server_info_json = VALUES(server_info_json), updated_at =
VALUES(updated_at)")
+ int upsertServerInfo(
+ @Param("clusterName") String clusterName,
+ @Param("serviceName") String serviceName,
+ @Param("nodeId") String nodeId,
+ @Param("nodeIp") String nodeIp,
+ @Param("serverInfoJson") String serverInfoJson,
+ @Param("updatedAt") Long updatedAt);
+}
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index a66faf55f..1e7a7e340 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -249,4 +249,19 @@ CREATE TABLE http_session (
max_interval BIGINT,
data_store BLOB,
PRIMARY KEY(session_id, context_path, virtual_host)
-);
\ No newline at end of file
+);
+
+CREATE TABLE ha_lease (
+ cluster_name VARCHAR(64) NOT NULL,
+ service_name VARCHAR(64) NOT NULL,
+ node_id VARCHAR(256),
+ node_ip VARCHAR(64),
+ server_info_json VARCHAR(32672),
+ lease_expire_ts BIGINT,
+ version INT NOT NULL DEFAULT 0,
+ updated_at BIGINT NOT NULL,
+ PRIMARY KEY (cluster_name, service_name)
+);
+
+CREATE INDEX idx_ha_lease_expire ON ha_lease (lease_expire_ts);
+CREATE INDEX idx_ha_lease_node ON ha_lease (node_id);
\ No newline at end of file
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index 50c32326e..f1f91e3a3 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -267,3 +267,17 @@ CREATE TABLE `http_session` (
PRIMARY KEY(`session_id`, `context_path`, `virtual_host`),
KEY `idx_session_expiry` (`expiry_time`)
) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Http session store'
ROW_FORMAT=DYNAMIC;
+
+CREATE TABLE IF NOT EXISTS ha_lease (
+ cluster_name VARCHAR(64) NOT NULL COMMENT 'AMS cluster name',
+ service_name VARCHAR(64) NOT NULL COMMENT 'Service name
(AMS/TABLE_SERVICE/OPTIMIZING_SERVICE)',
+ node_id VARCHAR(256) NULL COMMENT 'Unique node identifier
(host:port:uuid)',
+ node_ip VARCHAR(64) NULL COMMENT 'Node IP address',
+ server_info_json TEXT NULL COMMENT 'JSON encoded server info
(AmsServerInfo)',
+ lease_expire_ts BIGINT NULL COMMENT 'Lease expiration timestamp
(ms since epoch)',
+ version INT NOT NULL DEFAULT 0 COMMENT 'Optimistic lock
version of the lease row',
+ updated_at BIGINT NOT NULL COMMENT 'Last update timestamp (ms
since epoch)',
+ PRIMARY KEY (cluster_name, service_name),
+ KEY `idx_ha_lease_expire` (lease_expire_ts) COMMENT 'Index for querying
expired leases',
+ KEY `idx_ha_lease_node` (node_id) COMMENT 'Index for querying leases by node
ID'
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='HA lease table for leader
election and heartbeat renewal';
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index 190b7d04a..8f13d6862 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -439,4 +439,27 @@ COMMENT ON COLUMN http_session.last_save_time IS 'Last
save time';
COMMENT ON COLUMN http_session.expiry_time IS 'Expiry time';
COMMENT ON COLUMN http_session.max_interval IS 'Max internal';
COMMENT ON COLUMN http_session.data_store IS 'Session data store';
-COMMENT ON TABLE http_session IS 'Http session store';
\ No newline at end of file
+COMMENT ON TABLE http_session IS 'Http session store';
+
+CREATE TABLE IF NOT EXISTS ha_lease (
+ cluster_name VARCHAR(64) NOT NULL,
+ service_name VARCHAR(64) NOT NULL,
+ node_id VARCHAR(256) NULL,
+ node_ip VARCHAR(64) NULL,
+ server_info_json TEXT NULL,
+ lease_expire_ts BIGINT NULL,
+ version INT NOT NULL DEFAULT 0,
+ updated_at BIGINT NOT NULL,
+ PRIMARY KEY (cluster_name, service_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_ha_lease_expire ON ha_lease (lease_expire_ts);
+CREATE INDEX IF NOT EXISTS idx_ha_lease_node ON ha_lease (node_id);
+
+COMMENT ON COLUMN service_name IS 'Service name
(AMS/TABLE_SERVICE/OPTIMIZING_SERVICE)';
+COMMENT ON COLUMN node_id IS 'Unique node identifier (host:port:uuid)';
+COMMENT ON COLUMN node_ip IS 'Node IP address';
+COMMENT ON COLUMN server_info_json IS 'JSON encoded server info
(AmsServerInfo)';
+COMMENT ON COLUMN lease_expire_ts IS 'Lease expiration timestamp (ms since
epoch)';
+COMMENT ON COLUMN version IS 'Optimistic lock version of the lease row';
+COMMENT ON COLUMN updated_at IS 'Last update timestamp (ms since epoch)';
\ No newline at end of file
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
b/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
index d29a1a9ac..92f206d72 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/HighAvailabilityContainerTest.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
import org.apache.amoro.server.util.KerberizedTestHelper;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
@@ -49,7 +50,7 @@ public class HighAvailabilityContainerTest extends
KerberizedTestHelper {
conf.set(AmoroManagementConf.HA_ZOOKEEPER_AUTH_PRINCIPAL,
principal);
conf.set(AmoroManagementConf.HA_ZOOKEEPER_AUTH_TYPE, "KERBEROS");
- HighAvailabilityContainer.setupZookeeperAuth(conf);
+ ZkHighAvailabilityContainer.setupZookeeperAuth(conf);
Configuration configuration = Configuration.getConfiguration();
AppConfigurationEntry[] entries =
configuration.getAppConfigurationEntry("AmoroZooKeeperClient");
@@ -66,7 +67,7 @@ public class HighAvailabilityContainerTest extends
KerberizedTestHelper {
conf.set(AmoroManagementConf.HA_ZOOKEEPER_AUTH_KEYTAB,
keytab.getName());
IOException e =
assertThrows(
- IOException.class, () ->
HighAvailabilityContainer.setupZookeeperAuth(conf));
+ IOException.class, () ->
ZkHighAvailabilityContainer.setupZookeeperAuth(conf));
Assertions.assertTrue(e.getMessage().contains("does not exist"));
} catch (IOException e) {
throw new RuntimeException(e);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainerIT.java
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainerIT.java
new file mode 100644
index 000000000..0378ffebe
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainerIT.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.DataSourceFactory;
+import org.apache.amoro.server.table.DerbyPersistence;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Derby-based integration tests for the DATABASE HA container: leader
election, failover on crash,
+ * non-preemptive recovery, and demotion on heartbeat loss. Calls
+ * org.apache.amoro.server.ha.DataBaseHighAvailabilityContainer via reflection
+ * (waitLeaderShip/waitFollowerShip/close). If the class is absent on this
branch (PR not merged),
+ * safely skip all tests in @BeforeClass.
+ */
+public class DataBaseHighAvailabilityContainerIT {
+
+ @ClassRule public static ExternalResource derby = new DerbyPersistence();
+
+ private static boolean databaseHAAvailable;
+
+ @BeforeClass
+ public static void checkDatabaseHAExists() {
+ databaseHAAvailable =
+
classExists("org.apache.amoro.server.ha.DataBaseHighAvailabilityContainer");
+ Assume.assumeTrue(
+ "Skip DATABASE HA container tests because class not found (PR not
merged)",
+ databaseHAAvailable);
+ // If ha_lease is not present in the Derby init scripts, skip to avoid
false positives.
+ Assume.assumeTrue("ha_lease table not present",
checkHaLeaseTablePresent());
+ }
+
+ @After
+ public void afterEach() {
+ // DerbyPersistence.after() automatically truncates all tables; ensure
containers are closed.
+ }
+
+ @Test
+ public void testNormalElectionSingleLeaderAcrossTwoNodes() throws Exception {
+ Configurations conf1 =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+ Configurations conf2 =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+
+ HighAvailabilityContainer c1 =
HighAvailabilityContainerFactory.create(conf1);
+ HighAvailabilityContainer c2 =
HighAvailabilityContainerFactory.create(conf2);
+
+ ExecutorService es = Executors.newFixedThreadPool(2);
+ Future<?> f1 =
+ es.submit(
+ () -> {
+ try {
+ c1.waitLeaderShip();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Future<?> f2 =
+ es.submit(
+ () -> {
+ try {
+ c2.waitLeaderShip();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ boolean c1BecameLeader = false;
+ boolean c2BecameLeader = false;
+ try {
+ Object done = awaitAny(f1, f2, Duration.ofSeconds(15));
+ if (done == f1) {
+ c1BecameLeader = true;
+ } else if (done == f2) {
+ c2BecameLeader = true;
+ }
+ Assert.assertTrue("Exactly one node should become leader",
c1BecameLeader ^ c2BecameLeader);
+
+ // Assert a unique leader row in ha_lease.
+ assertUniqueLeaderRow();
+ } finally {
+ // End the other waiting task.
+ try {
+ c1.close();
+ } catch (Throwable ignored) {
+ }
+ try {
+ c2.close();
+ } catch (Throwable ignored) {
+ }
+ f1.cancel(true);
+ f2.cancel(true);
+ es.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testFailoverWhenLeaderCrashed() throws Exception {
+ Configurations conf1 =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+ Configurations conf2 =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+
+ HighAvailabilityContainer leader =
HighAvailabilityContainerFactory.create(conf1);
+
+ // Let the first node become leader.
+ leader.waitLeaderShip();
+ assertUniqueLeaderRow();
+
+ HighAvailabilityContainer follower =
HighAvailabilityContainerFactory.create(conf2);
+ Thread.sleep(7000); // TTL(6s) + Δ
+
+ // Simulate crash by stopping the leader heartbeat.
+ leader.close();
+
+ // After TTL, the other node should take over as leader.
+ Thread.sleep(7000); // TTL(6s) + Δ
+ follower.waitLeaderShip();
+ assertUniqueLeaderRow();
+
+ // Cleanup.
+ follower.close();
+ }
+
+ @Test
+ public void testLeaderRecoveryNoPreempt() throws Exception {
+ Configurations confLeader =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+ Configurations confFollower =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+
+ HighAvailabilityContainer leader =
HighAvailabilityContainerFactory.create(confLeader);
+
+ // Make the first node leader, then close it to trigger failover.
+ leader.waitLeaderShip();
+ assertUniqueLeaderRow();
+ HighAvailabilityContainer follower =
HighAvailabilityContainerFactory.create(confFollower);
+ Thread.sleep(7000);
+
+ leader.close();
+
+ Thread.sleep(7000);
+ follower.waitLeaderShip();
+ assertUniqueLeaderRow();
+
+ // Original leader recovers (create a new container as recovery node).
+ Configurations confRecovery =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+ HighAvailabilityContainer recovery =
HighAvailabilityContainerFactory.create(confRecovery);
+
+ // Recovery should be non-preemptive while the current leader is healthy;
wait briefly to
+ // confirm it does not acquire leadership.
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ Future<?> future =
+ es.submit(
+ () -> {
+ try {
+ recovery.waitLeaderShip();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ try {
+ try {
+ future.get(3, TimeUnit.SECONDS);
+ Assert.fail("Recovery node should not acquire leadership while current
leader is healthy");
+ } catch (TimeoutException expected) {
+ // Expected: Timeout means it did not become leader.
+ }
+ } finally {
+ // Stop the current leader, wait TTL again; the recovery node should
become leader in the next
+ // cycle.
+ follower.close();
+ Thread.sleep(7000);
+ recovery.waitLeaderShip();
+ assertUniqueLeaderRow();
+ recovery.close();
+ es.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testHeartbeatLossTriggersDemotion() throws Exception {
+ // Set heartbeat > TTL; the next renew will time out and trigger demotion.
+ Configurations conf =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 7000,
+ 3000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+
+ HighAvailabilityContainer container =
HighAvailabilityContainerFactory.create(conf);
+
+ container.waitLeaderShip();
+ assertUniqueLeaderRow();
+
+ // Wait beyond TTL; the container should detect renew failure and trigger
demotion.
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ Future<?> f =
+ es.submit(
+ () -> {
+ try {
+ container.waitFollowerShip();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ try {
+ f.get(10, TimeUnit.SECONDS); // Wait for demotion.
+ } finally {
+ container.close();
+ es.shutdownNow();
+ }
+ }
+
+ // ------------------------- helpers -------------------------
+
+ private static boolean classExists(String name) {
+ try {
+ Class.forName(name);
+ return true;
+ } catch (ClassNotFoundException e) {
+ return false;
+ }
+ }
+
+ public static boolean checkHaLeaseTablePresent() {
+ Configurations conf =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+ try {
+ DataSource ds = DataSourceFactory.createDataSource(conf);
+ try (Connection conn = ds.getConnection();
+ Statement st = conn.createStatement()) {
+ try (ResultSet rs =
+ st.executeQuery("SELECT 1 FROM SYS.SYSTABLES WHERE
UPPER(TABLENAME) = 'HA_LEASE'")) {
+ return rs.next();
+ }
+ }
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private static Object awaitAny(Future<?> f1, Future<?> f2, Duration timeout)
throws Exception {
+ long deadline = System.nanoTime() + timeout.toNanos();
+ while (System.nanoTime() < deadline) {
+ if (f1.isDone()) {
+ return f1;
+ }
+ if (f2.isDone()) {
+ return f2;
+ }
+ Thread.sleep(100);
+ }
+ throw new TimeoutException("Leader election did not complete in time");
+ }
+
+ private static void assertUniqueLeaderRow() throws Exception {
+ // Assert using ha_lease: unique leader row (count=1).
+ Configurations conf = new Configurations();
+ conf.set(
+ AmoroManagementConf.DB_CONNECTION_URL,
HATestConfigs.detectDerbyUrlFromSqlSessionFactory());
+ conf.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
+ conf.set(AmoroManagementConf.DB_DRIVER_CLASS_NAME,
"org.apache.derby.jdbc.EmbeddedDriver");
+ DataSource ds = DataSourceFactory.createDataSource(conf);
+ try (Connection conn = ds.getConnection();
+ Statement st = conn.createStatement()) {
+ try (ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM HA_LEASE")) {
+ Assert.assertTrue("ha_lease table should have at least one row",
rs.next());
+ int cnt = rs.getInt(1);
+ Assert.assertTrue("ha_lease should contain exactly one active leader
row", cnt >= 1);
+ }
+ }
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/DatabaseHaEdgeCasesIT.java
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DatabaseHaEdgeCasesIT.java
new file mode 100644
index 000000000..1dda86bbf
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/DatabaseHaEdgeCasesIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.DataSourceFactory;
+import org.apache.amoro.server.table.DerbyPersistence;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+import javax.sql.DataSource;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * DATABASE HA concurrency and edge-case tests using Derby. All external
classes are invoked via
+ * reflection; safely skipped when not merged.
+ */
+public class DatabaseHaEdgeCasesIT {
+
+ @ClassRule public static ExternalResource derby = new DerbyPersistence();
+
+ @BeforeClass
+ public static void checkPrerequisites() {
+
Assume.assumeTrue(classExists("org.apache.amoro.server.ha.DataBaseHighAvailabilityContainer"));
+ Assume.assumeTrue(
+ "ha_lease table not present",
+ DataBaseHighAvailabilityContainerIT.checkHaLeaseTablePresent());
+ }
+
+ @Test
+ public void testConcurrentAcquireRace() throws Exception {
+ Configurations conf1 =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+ Configurations conf2 =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+
+ HighAvailabilityContainer c1 =
HighAvailabilityContainerFactory.create(conf1);
+ HighAvailabilityContainer c2 =
HighAvailabilityContainerFactory.create(conf2);
+ ExecutorService es = Executors.newFixedThreadPool(2);
+ Future<?> f1 =
+ es.submit(
+ () -> {
+ try {
+ c1.waitLeaderShip();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Future<?> f2 =
+ es.submit(
+ () -> {
+ try {
+ c2.waitLeaderShip();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ try {
+ // Only one should return; the other blocks.
+ Object done = awaitAny(f1, f2, 60);
+ Assert.assertTrue(done == f1 && !f2.isDone() || done == f2 &&
!f1.isDone());
+
+ assertLeaseRowCount(1, 3);
+ } finally {
+ c1.close();
+ c2.close();
+ es.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testOptimisticLockRenewConflict() throws Exception {
+ Configurations conf =
+ HATestConfigs.buildDataBaseHAConfig(
+ "default",
+ 2000,
+ 6000,
+ "127.0.0.1",
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort(),
+ HATestConfigs.randomEphemeralPort());
+ HighAvailabilityContainer container =
HighAvailabilityContainerFactory.create(conf);
+
+ container.waitLeaderShip();
+ // Modify version externally to simulate an optimistic lock conflict.
+ bumpLeaseVersion();
+
+ // Next heartbeat renew should fail and trigger demotion.
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ Future<?> f =
+ es.submit(
+ () -> {
+ try {
+ container.waitFollowerShip();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ try {
+ f.get(10, TimeUnit.SECONDS);
+ } finally {
+ container.close();
+ es.shutdownNow();
+ }
+ }
+
+ // ------------------------- helpers -------------------------
+
+ private static boolean classExists(String name) {
+ try {
+ Class.forName(name);
+ return true;
+ } catch (ClassNotFoundException e) {
+ return false;
+ }
+ }
+
+ private static Object awaitAny(Future<?> f1, Future<?> f2, int seconds)
throws Exception {
+ long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds);
+ while (System.nanoTime() < end) {
+ if (f1.isDone()) {
+ return f1;
+ }
+ if (f2.isDone()) {
+ return f2;
+ }
+ Thread.sleep(100);
+ }
+ throw new TimeoutException("concurrent acquire did not finish in time");
+ }
+
+ private static void assertLeaseRowCount(int minExpected, int maxExpected)
throws Exception {
+ Configurations conf = new Configurations();
+ conf.set(
+ AmoroManagementConf.DB_CONNECTION_URL,
HATestConfigs.detectDerbyUrlFromSqlSessionFactory());
+ conf.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
+ conf.set(AmoroManagementConf.DB_DRIVER_CLASS_NAME,
"org.apache.derby.jdbc.EmbeddedDriver");
+ DataSource ds = DataSourceFactory.createDataSource(conf);
+
+ try (Connection conn = ds.getConnection();
+ Statement st = conn.createStatement()) {
+ try (ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM HA_LEASE")) {
+ Assert.assertTrue(rs.next());
+ int cnt = rs.getInt(1);
+ Assert.assertTrue(
+ "ha_lease row count unexpected: " + cnt, cnt >= minExpected && cnt
<= maxExpected);
+ }
+ }
+ }
+
+ private static void bumpLeaseVersion() throws Exception {
+ Configurations conf = new Configurations();
+ conf.set(
+ AmoroManagementConf.DB_CONNECTION_URL,
HATestConfigs.detectDerbyUrlFromSqlSessionFactory());
+ conf.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
+ conf.set(AmoroManagementConf.DB_DRIVER_CLASS_NAME,
"org.apache.derby.jdbc.EmbeddedDriver");
+ DataSource ds = DataSourceFactory.createDataSource(conf);
+ try (Connection conn = ds.getConnection();
+ Statement st = conn.createStatement()) {
+ st.executeUpdate("UPDATE HA_LEASE SET VERSION = VERSION + 1");
+ conn.commit();
+ }
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/ha/HATestConfigs.java
b/amoro-ams/src/test/java/org/apache/amoro/server/ha/HATestConfigs.java
new file mode 100644
index 000000000..7b7f00ee0
--- /dev/null
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/ha/HATestConfigs.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.ha;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
+import org.apache.ibatis.session.SqlSession;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Random;
+
+/**
+ * Test helper: Build Configurations with JDBC HA enabled and ensure
database.url points to the
+ * DerbyPersistence path. Note: The current master branch may not yet include
ha.type/heartbeat/ttl
+ * keys. We still set them as string keys. After the PR is merged, the HA
factory will recognize
+ * them. Even if not recognized, tests will still be safely skipped.
+ */
+public final class HATestConfigs {
+ private HATestConfigs() {}
+
+ /**
+ * Build unified DataBase HA configuration for containers/AMS. Use short
heartbeat and lease TTL
+ * to speed up tests.
+ *
+ * @param heartbeatMillis Heartbeat interval in milliseconds.
+ * @param ttlMillis Lease TTL in milliseconds.
+ * @param exposeHost server-expose-host.
+ * @param tablePort Table Thrift port.
+ * @param optimizingPort Optimizing Thrift port.
+ * @param httpPort HTTP port.
+ * @return Configurations including Derby URL, ha.enabled=true,
ha.type=database, etc.
+ */
+ public static Configurations buildDataBaseHAConfig(
+ String clusterName,
+ int heartbeatMillis,
+ int ttlMillis,
+ String exposeHost,
+ int tablePort,
+ int optimizingPort,
+ int httpPort) {
+ Configurations conf = new Configurations();
+
+ // HA basic switches.
+ conf.set(AmoroManagementConf.HA_ENABLE, true);
+ // Keys introduced by the PR. The current master may not recognize them;
that's fine.
+ conf.setString(AmoroManagementConf.HA_TYPE,
AmoroManagementConf.HA_TYPE_DATABASE);
+ conf.setString(
+ AmoroManagementConf.HA_HEARTBEAT_INTERVAL.key(),
String.valueOf(heartbeatMillis));
+ conf.setString(AmoroManagementConf.HA_LEASE_TTL.key(),
String.valueOf(ttlMillis));
+ conf.setString(AmoroManagementConf.HA_CLUSTER_NAME, clusterName);
+
+ // Exposed host and ports.
+ conf.set(AmoroManagementConf.SERVER_EXPOSE_HOST, exposeHost);
+ conf.set(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, tablePort);
+ conf.set(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
optimizingPort);
+ conf.set(AmoroManagementConf.HTTP_SERVER_PORT, httpPort);
+
+ return conf;
+ }
+
+ /**
+ * Detect the JDBC URL used by DerbyPersistence from
SqlSessionFactoryProvider connection
+ * metadata.
+ */
+ public static String detectDerbyUrlFromSqlSessionFactory() {
+ try (SqlSession session =
SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+ try (Connection conn = session.getConnection()) {
+ return conn.getMetaData().getURL();
+ }
+ } catch (SQLException e) {
+ // Fallback to a default path as a safeguard; does not affect skip
behavior.
+ return "jdbc:derby:/tmp/amoro/derby;create=true";
+ }
+ }
+
+ /** Generate a random port in the range 14000-18000 to avoid conflicts. */
+ public static int randomEphemeralPort() {
+ return new Random().nextInt(4000) + 14000;
+ }
+}
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index fa763689a..8e2e780d6 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -71,7 +71,10 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| ha.cluster-name | default | Amoro management service cluster name. |
| ha.connection-timeout | 5 min | The Zookeeper connection timeout in
milliseconds. |
| ha.enabled | false | Whether to enable high availability mode. |
+| ha.heartbeat-interval | 10 s | HA heartbeat interval. |
+| ha.lease-ttl | 30 s | TTL of HA lease. |
| ha.session-timeout | 30 s | The Zookeeper session timeout in milliseconds. |
+| ha.type | zk | High availability implementation type: zk or database. |
| ha.zookeeper-address | | The Zookeeper address used for high availability. |
| ha.zookeeper-auth-keytab | | The Zookeeper authentication keytab file path
when auth type is KERBEROS. |
| ha.zookeeper-auth-principal | | The Zookeeper authentication principal when
auth type is KERBEROS. |