This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit bbc445a0a7b9b0a4c5380e5b8864a6e9206ee02a
Author: whenzhou.zc <[email protected]>
AuthorDate: Mon Mar 2 17:03:52 2026 +0800

    [server] Support Coordinator High Availability
---
 .../exception/NotCoordinatorLeaderException.java   |  32 ++
 .../java/org/apache/fluss/metrics/MetricNames.java |   1 +
 .../java/org/apache/fluss/rpc/protocol/Errors.java |   7 +-
 .../server/coordinator/CoordinatorContext.java     |  18 +
 .../coordinator/CoordinatorEventProcessor.java     |  44 ++-
 .../coordinator/CoordinatorLeaderElection.java     | 210 +++++++++++
 .../server/coordinator/CoordinatorServer.java      | 196 +++++++++-
 .../server/coordinator/CoordinatorService.java     |  79 +++-
 .../coordinator/RequireCoordinatorLeader.java      |  48 +++
 .../coordinator/event/CoordinatorEventManager.java |   9 +
 .../event/DeadCoordinatorEvent.java}               |  39 +-
 .../event/NewCoordinatorEvent.java}                |  39 +-
 .../event/watcher/CoordinatorChangeWatcher.java    |  89 +++++
 .../event/watcher/ServerBaseChangeWatcher.java     |  66 ++++
 .../event/watcher/TabletServerChangeWatcher.java   |  45 +--
 .../apache/fluss/server/zk/ZooKeeperClient.java    |  33 +-
 .../fluss/server/zk/data/CoordinatorAddress.java   |   2 +-
 .../org/apache/fluss/server/zk/data/ZkData.java    |  48 ++-
 .../org/apache/fluss/server/ServerTestBase.java    |  15 +-
 .../coordinator/CoordinatorEventProcessorTest.java |   2 +-
 .../server/coordinator/CoordinatorHAITCase.java    | 419 +++++++++++++++++++++
 .../coordinator/CoordinatorServerElectionTest.java | 153 ++++++++
 .../coordinator/CoordinatorServerITCase.java       |   1 +
 .../server/coordinator/CoordinatorServerTest.java  |  19 +-
 ...Test.java => CoordinatorChangeWatcherTest.java} |  43 +--
 .../watcher/TabletServerChangeWatcherTest.java     |   2 +-
 .../server/testutils/FlussClusterExtension.java    |  11 +
 .../fluss/server/zk/ZooKeeperClientTest.java       |   4 +-
 .../maintenance/observability/monitor-metrics.md   |  11 +-
 29 files changed, 1536 insertions(+), 149 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java
new file mode 100644
index 000000000..a3f65aaf4
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/** Exception thrown when a request is sent to a stand by coordinator server. 
since: 0.9 */
+public class NotCoordinatorLeaderException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NotCoordinatorLeaderException(String message) {
+        super(message);
+    }
+
+    public NotCoordinatorLeaderException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index bf6440dec..b0fcaeecb 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -36,6 +36,7 @@ public class MetricNames {
     // metrics for coordinator server
     // 
--------------------------------------------------------------------------------------------
     public static final String ACTIVE_COORDINATOR_COUNT = 
"activeCoordinatorCount";
+    public static final String ALIVE_COORDINATOR_COUNT = 
"aliveCoordinatorCount";
     public static final String ACTIVE_TABLET_SERVER_COUNT = 
"activeTabletServerCount";
     public static final String OFFLINE_BUCKET_COUNT = "offlineBucketCount";
     public static final String TABLE_COUNT = "tableCount";
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index dd16d10e2..0ab13e6b7 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -56,6 +56,7 @@ import org.apache.fluss.exception.LogStorageException;
 import org.apache.fluss.exception.NetworkException;
 import org.apache.fluss.exception.NoRebalanceInProgressException;
 import org.apache.fluss.exception.NonPrimaryKeyTableException;
+import org.apache.fluss.exception.NotCoordinatorLeaderException;
 import org.apache.fluss.exception.NotEnoughReplicasAfterAppendException;
 import org.apache.fluss.exception.NotEnoughReplicasException;
 import org.apache.fluss.exception.NotLeaderOrFollowerException;
@@ -247,7 +248,11 @@ public enum Errors {
             63,
             "The client has attempted to perform an operation with an invalid 
producer ID.",
             InvalidProducerIdException::new),
-    CONFIG_EXCEPTION(64, "A configuration error occurred.", 
ConfigException::new);
+    CONFIG_EXCEPTION(64, "A configuration error occurred.", 
ConfigException::new),
+    NOT_COORDINATOR_LEADER_EXCEPTION(
+            65,
+            "The coordinator is not a leader and cannot process request.",
+            NotCoordinatorLeaderException::new);
 
     private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
index bece7d9dc..2b821b02d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
@@ -67,6 +67,7 @@ public class CoordinatorContext {
     // a success deletion.
     private final Map<TableBucketReplica, Integer> failDeleteNumbers = new 
HashMap<>();
 
+    private final Set<String> liveCoordinatorServers = new HashSet<>();
     private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>();
     private final Set<Integer> shuttingDownTabletServers = new HashSet<>();
 
@@ -115,6 +116,23 @@ public class CoordinatorContext {
         return coordinatorEpoch;
     }
 
+    public Set<String> getLiveCoordinatorServers() {
+        return liveCoordinatorServers;
+    }
+
+    public void setLiveCoordinators(Set<String> servers) {
+        liveCoordinatorServers.clear();
+        liveCoordinatorServers.addAll(servers);
+    }
+
+    public void addLiveCoordinator(String serverId) {
+        this.liveCoordinatorServers.add(serverId);
+    }
+
+    public void removeLiveCoordinator(String serverId) {
+        this.liveCoordinatorServers.remove(serverId);
+    }
+
     public Map<Integer, ServerInfo> getLiveTabletServers() {
         return liveTabletServers;
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index ce6f7f336..4668593ea 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -71,6 +71,7 @@ import 
org.apache.fluss.server.coordinator.event.CoordinatorEvent;
 import org.apache.fluss.server.coordinator.event.CoordinatorEventManager;
 import org.apache.fluss.server.coordinator.event.CreatePartitionEvent;
 import org.apache.fluss.server.coordinator.event.CreateTableEvent;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
 import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent;
 import 
org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent;
 import org.apache.fluss.server.coordinator.event.DropPartitionEvent;
@@ -78,6 +79,7 @@ import 
org.apache.fluss.server.coordinator.event.DropTableEvent;
 import org.apache.fluss.server.coordinator.event.EventProcessor;
 import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent;
 import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
 import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
 import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
 import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent;
@@ -86,6 +88,7 @@ import 
org.apache.fluss.server.coordinator.event.RebalanceEvent;
 import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
 import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
 import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
+import 
org.apache.fluss.server.coordinator.event.watcher.CoordinatorChangeWatcher;
 import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
 import 
org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
 import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager;
@@ -172,6 +175,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
     private final LakeTableTieringManager lakeTableTieringManager;
     private final TableChangeWatcher tableChangeWatcher;
     private final CoordinatorChannelManager coordinatorChannelManager;
+    private final CoordinatorChangeWatcher coordinatorChangeWatcher;
     private final TabletServerChangeWatcher tabletServerChangeWatcher;
     private final CoordinatorMetadataCache serverMetadataCache;
     private final CoordinatorRequestBatch coordinatorRequestBatch;
@@ -224,6 +228,8 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                         tableBucketStateMachine,
                         new RemoteStorageCleaner(conf, ioExecutor),
                         ioExecutor);
+        this.coordinatorChangeWatcher =
+                new CoordinatorChangeWatcher(zooKeeperClient, 
coordinatorEventManager);
         this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, 
coordinatorEventManager);
         this.tabletServerChangeWatcher =
                 new TabletServerChangeWatcher(zooKeeperClient, 
coordinatorEventManager);
@@ -263,6 +269,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
     public void startup() {
         
coordinatorContext.setCoordinatorServerInfo(getCoordinatorServerInfo());
         // start watchers first so that we won't miss node in zk;
+        coordinatorChangeWatcher.start();
         tabletServerChangeWatcher.start();
         tableChangeWatcher.start();
         LOG.info("Initializing coordinator context.");
@@ -306,12 +313,9 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
     private ServerInfo getCoordinatorServerInfo() {
         try {
             return zooKeeperClient
-                    .getCoordinatorAddress()
+                    .getCoordinatorLeaderAddress()
                     .map(
                             coordinatorAddress ->
-                                    // TODO we set id to 0 as that 
CoordinatorServer don't support
-                                    // HA, if we support HA, we need to set id 
to the config
-                                    // CoordinatorServer id to avoid node 
drift.
                                     new ServerInfo(
                                             0,
                                             null, // For coordinatorServer, no 
rack info
@@ -334,6 +338,11 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
     private void initCoordinatorContext() throws Exception {
         long start = System.currentTimeMillis();
+        // get all coordinator servers
+        List<String> currentCoordinatorServers = 
zooKeeperClient.getCoordinatorServerList();
+        coordinatorContext.setLiveCoordinators(new 
HashSet<>(currentCoordinatorServers));
+        LOG.info("Load coordinator servers success when initializing 
coordinator context.");
+
         // get all tablet server's
         int[] currentServers = zooKeeperClient.getSortedTabletServerList();
         List<ServerInfo> tabletServerInfos = new ArrayList<>();
@@ -548,6 +557,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         tableManager.shutdown();
 
         // then stop watchers
+        coordinatorChangeWatcher.stop();
         tableChangeWatcher.stop();
         tabletServerChangeWatcher.stop();
     }
@@ -572,6 +582,10 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                     (NotifyLeaderAndIsrResponseReceivedEvent) event);
         } else if (event instanceof DeleteReplicaResponseReceivedEvent) {
             
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) 
event);
+        } else if (event instanceof NewCoordinatorEvent) {
+            processNewCoordinator((NewCoordinatorEvent) event);
+        } else if (event instanceof DeadCoordinatorEvent) {
+            processDeadCoordinator((DeadCoordinatorEvent) event);
         } else if (event instanceof NewTabletServerEvent) {
             processNewTabletServer((NewTabletServerEvent) event);
         } else if (event instanceof DeadTabletServerEvent) {
@@ -984,6 +998,28 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         replicaStateMachine.handleStateChanges(offlineReplicas, 
OfflineReplica);
     }
 
+    private void processNewCoordinator(NewCoordinatorEvent 
newCoordinatorEvent) {
+        String coordinatorServerId = newCoordinatorEvent.getServerId();
+        if 
(coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) {
+            return;
+        }
+
+        // process new coordinator server
+        LOG.info("New coordinator server callback for coordinator server {}", 
coordinatorServerId);
+
+        coordinatorContext.addLiveCoordinator(coordinatorServerId);
+    }
+
+    private void processDeadCoordinator(DeadCoordinatorEvent 
deadCoordinatorEvent) {
+        String coordinatorServerId = deadCoordinatorEvent.getServerId();
+        if 
(!coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) 
{
+            return;
+        }
+        // process dead coordinator server
+        LOG.info("Coordinator server failure callback for {}.", 
coordinatorServerId);
+        coordinatorContext.removeLiveCoordinator(coordinatorServerId);
+    }
+
     private void processNewTabletServer(NewTabletServerEvent 
newTabletServerEvent) {
         // NOTE: we won't need to detect bounced tablet servers like Kafka as 
we won't
         // miss the event of tablet server un-register and register again 
since we can
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
new file mode 100644
index 000000000..9f09df6af
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * Using by coordinator server. Coordinator servers listen ZK node and elect 
leadership.
+ *
+ * <p>This class manages the leader election lifecycle:
+ *
+ * <ul>
+ *   <li>Start election and participate as a candidate
+ *   <li>When elected as leader, invoke the initialization callback
+ *   <li>When losing leadership, clean up leader resources but continue 
participating in election
+ *   <li>Can be re-elected as leader multiple times
+ * </ul>
+ */
+public class CoordinatorLeaderElection implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorLeaderElection.class);
+
+    private final String serverId;
+    private final LeaderLatch leaderLatch;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+    private final CompletableFuture<Void> leaderReadyFuture = new 
CompletableFuture<>();
+    // Single-threaded executor to run leader init callbacks outside Curator's 
EventThread.
+    // Curator's LeaderLatchListener callbacks run on its internal 
EventThread; performing
+    // synchronous ZK operations there causes deadlock because ZK response 
dispatch also
+    // needs that same thread.
+    private final ExecutorService leaderCallbackExecutor;
+    // Tracks the pending cleanup task so that init can wait for it to 
complete.
+    private final AtomicReference<CompletableFuture<Void>> pendingCleanup =
+            new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private volatile Runnable initLeaderServices;
+    private volatile Consumer<Throwable> cleanupLeaderServices;
+
+    public CoordinatorLeaderElection(ZooKeeperClient zkClient, String 
serverId) {
+        this.serverId = serverId;
+        this.leaderLatch =
+                new LeaderLatch(
+                        zkClient.getCuratorClient(),
+                        ZkData.CoordinatorElectionZNode.path(),
+                        String.valueOf(serverId));
+        this.leaderCallbackExecutor =
+                Executors.newSingleThreadExecutor(
+                        r -> {
+                            Thread t = new Thread(r, 
"coordinator-leader-callback-" + serverId);
+                            t.setDaemon(true);
+                            return t;
+                        });
+    }
+
+    /**
+     * Starts the leader election process asynchronously. The returned future 
completes when this
+     * server becomes the leader for the first time and initializes the leader 
services.
+     *
+     * <p>After the first election, the server will continue to participate in 
future elections.
+     * When re-elected as leader, the initLeaderServices callback will be 
invoked again.
+     *
+     * @param initLeaderServices the callback to initialize leader services 
once elected
+     * @param cleanupLeaderServices the callback to clean up leader services 
when losing leadership
+     * @return a CompletableFuture that completes when this server becomes 
leader for the first time
+     */
+    public CompletableFuture<Void> startElectLeaderAsync(
+            Runnable initLeaderServices, Consumer<Throwable> 
cleanupLeaderServices) {
+        this.initLeaderServices = initLeaderServices;
+        this.cleanupLeaderServices = cleanupLeaderServices;
+
+        leaderLatch.addListener(
+                new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        LOG.info("Coordinator server {} has become the 
leader.", serverId);
+                        // Capture the pending cleanup future at this point so 
that
+                        // init waits for it before proceeding.
+                        CompletableFuture<Void> cleanup = pendingCleanup.get();
+                        // Run init on a separate thread to avoid deadlock with
+                        // Curator's EventThread when performing ZK operations.
+                        leaderCallbackExecutor.execute(
+                                () -> {
+                                    // Wait for any pending cleanup to finish 
first.
+                                    try {
+                                        cleanup.get(60, TimeUnit.SECONDS);
+                                    } catch (TimeoutException e) {
+                                        LOG.warn(
+                                                "Pending cleanup for server {} 
did not complete within 60s, proceeding with init.",
+                                                serverId);
+                                    } catch (Exception e) {
+                                        LOG.warn(
+                                                "Error waiting for pending 
cleanup for server {}",
+                                                serverId,
+                                                e);
+                                    }
+                                    try {
+                                        initLeaderServices.run();
+                                        leaderReadyFuture.complete(null);
+                                    } catch (Exception e) {
+                                        LOG.error(
+                                                "Failed to initialize leader 
services for server {}",
+                                                serverId,
+                                                e);
+                                        
leaderReadyFuture.completeExceptionally(e);
+                                    }
+                                });
+                        isLeader.set(true);
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        if (isLeader.compareAndSet(true, false)) {
+                            LOG.warn(
+                                    "Coordinator server {} has lost the 
leadership, cleaning up leader services.",
+                                    serverId);
+                            // Run cleanup on a separate daemon thread (NOT on 
the
+                            // leaderCallbackExecutor) to avoid blocking init 
tasks.
+                            // The cleanup completion is tracked via 
pendingCleanup so
+                            // that subsequent init waits for it.
+                            CompletableFuture<Void> cleanupFuture = new 
CompletableFuture<>();
+                            pendingCleanup.set(cleanupFuture);
+                            Thread cleanupThread =
+                                    new Thread(
+                                            () -> {
+                                                try {
+                                                    if (cleanupLeaderServices 
!= null) {
+                                                        
cleanupLeaderServices.accept(null);
+                                                    }
+                                                } catch (Exception e) {
+                                                    LOG.error(
+                                                            "Failed to cleanup 
leader services for server {}",
+                                                            serverId,
+                                                            e);
+                                                } finally {
+                                                    
cleanupFuture.complete(null);
+                                                }
+                                            },
+                                            "coordinator-leader-cleanup-" + 
serverId);
+                            cleanupThread.setDaemon(true);
+                            cleanupThread.start();
+                        }
+                    }
+                });
+
+        try {
+            leaderLatch.start();
+            LOG.info("Coordinator server {} started leader election.", 
serverId);
+        } catch (Exception e) {
+            LOG.error("Failed to start LeaderLatch for server {}", serverId, 
e);
+            leaderReadyFuture.completeExceptionally(
+                    new RuntimeException("Leader election start failed", e));
+        }
+
+        return leaderReadyFuture;
+    }
+
+    @Override
+    public void close() {
+        LOG.info("Closing LeaderLatch for server {}.", serverId);
+
+        if (leaderLatch != null) {
+            try {
+                leaderLatch.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close LeaderLatch for server {}.", 
serverId, e);
+            }
+        }
+
+        leaderCallbackExecutor.shutdownNow();
+
+        // Complete the future exceptionally if it hasn't been completed yet
+        if (!leaderReadyFuture.isDone()) {
+            leaderReadyFuture.completeExceptionally(
+                    new RuntimeException("Leader election closed for server " 
+ serverId));
+        }
+    }
+
+    public boolean isLeader() {
+        return this.isLeader.get();
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 1f9f3306e..2262b971b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -92,8 +92,7 @@ public class CoordinatorServer extends ServerBase {
     private final AtomicBoolean isShutDown = new AtomicBoolean(false);
     private final Clock clock;
 
-    @GuardedBy("lock")
-    private String serverId;
+    private final String serverId;
 
     @GuardedBy("lock")
     private MetricRegistry metricRegistry;
@@ -116,6 +115,9 @@ public class CoordinatorServer extends ServerBase {
     @GuardedBy("lock")
     private CoordinatorMetadataCache metadataCache;
 
+    @GuardedBy("lock")
+    private MetadataManager metadataManager;
+
     @GuardedBy("lock")
     private CoordinatorChannelManager coordinatorChannelManager;
 
@@ -147,6 +149,12 @@ public class CoordinatorServer extends ServerBase {
     @GuardedBy("lock")
     private LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
 
+    @GuardedBy("lock")
+    private CoordinatorLeaderElection coordinatorLeaderElection;
+
+    @GuardedBy("lock")
+    private CompletableFuture<Void> leaderElectionFuture;
+
     @GuardedBy("lock")
     private KvSnapshotLeaseManager kvSnapshotLeaseManager;
 
@@ -158,6 +166,7 @@ public class CoordinatorServer extends ServerBase {
         super(conf);
         validateCoordinatorConfigs(conf);
         this.terminationFuture = new CompletableFuture<>();
+        this.serverId = UUID.randomUUID().toString();
         this.clock = clock;
     }
 
@@ -170,10 +179,41 @@ public class CoordinatorServer extends ServerBase {
 
     @Override
     protected void startServices() throws Exception {
+        electCoordinatorLeaderAsync();
+    }
+
+    private CompletableFuture<Void> electCoordinatorLeaderAsync() throws 
Exception {
+        initCoordinatorStandby();
+
+        // start election (coordinatorLeaderElection is created inside 
initCoordinatorStandby
+        // after zkClient is initialized)
+        this.leaderElectionFuture =
+                coordinatorLeaderElection.startElectLeaderAsync(
+                        () -> {
+                            try {
+                                initCoordinatorLeader();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        },
+                        (Throwable t) -> {
+                            try {
+                                cleanupCoordinatorLeader();
+                            } catch (Exception e) {
+                                LOG.error("Failed to cleanup coordinator 
leader services", e);
+                            }
+                        });
+        return leaderElectionFuture;
+    }
+
+    protected void initCoordinatorStandby() throws Exception {
+        // When a coordinator server starts, it first becomes a standby.
+        // This method execute initialization for standby, opening necessary 
rpc server port.
+        // Corresponding rpc methods will reject requests from clients
+        // and just serve for health check.
         synchronized (lock) {
-            LOG.info("Initializing Coordinator services.");
+            LOG.info("Initializing Coordinator services as standby.");
             List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, 
ServerType.COORDINATOR);
-            this.serverId = UUID.randomUUID().toString();
 
             // for metrics
             this.metricRegistry = MetricRegistry.create(conf, pluginManager);
@@ -186,6 +226,9 @@ public class CoordinatorServer extends ServerBase {
 
             this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
 
+            // CoordinatorLeaderElection must be created after zkClient is 
initialized.
+            this.coordinatorLeaderElection = new 
CoordinatorLeaderElection(zkClient, serverId);
+
             this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, 
pluginManager, true);
             this.dynamicConfigManager = new DynamicConfigManager(zkClient, 
conf, true);
 
@@ -206,8 +249,7 @@ public class CoordinatorServer extends ServerBase {
                     new LakeTableTieringManager(
                             new LakeTieringMetricGroup(metricRegistry, 
serverMetricGroup));
 
-            MetadataManager metadataManager =
-                    new MetadataManager(zkClient, conf, 
lakeCatalogDynamicLoader);
+            this.metadataManager = new MetadataManager(zkClient, conf, 
lakeCatalogDynamicLoader);
             this.ioExecutor =
                     Executors.newFixedThreadPool(
                             conf.get(ConfigOptions.SERVER_IO_POOL_SIZE),
@@ -237,7 +279,8 @@ public class CoordinatorServer extends ServerBase {
                             lakeTableTieringManager,
                             dynamicConfigManager,
                             ioExecutor,
-                            kvSnapshotLeaseManager);
+                            kvSnapshotLeaseManager,
+                            coordinatorLeaderElection);
 
             this.rpcServer =
                     RpcServer.create(
@@ -249,11 +292,15 @@ public class CoordinatorServer extends ServerBase {
                                     serverMetricGroup));
             rpcServer.start();
 
-            registerCoordinatorLeader();
-            // when init session, register coordinator server again
+            registerCoordinatorServer();
             ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
-                    zkClient, this::registerCoordinatorLeader, this);
+                    zkClient, this::registerCoordinatorServer, this);
+        }
+    }
+
+    protected void initCoordinatorLeader() throws Exception {
 
+        synchronized (lock) {
             this.clientMetricGroup = new ClientMetricGroup(metricRegistry, 
SERVER_NAME);
             this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
 
@@ -263,6 +310,7 @@ public class CoordinatorServer extends ServerBase {
                     new AutoPartitionManager(metadataCache, metadataManager, 
conf);
             autoPartitionManager.start();
 
+            registerCoordinatorLeader();
             // start coordinator event processor after we register coordinator 
leader to zk
             // so that the event processor can get the coordinator leader node 
from zk during start
             // up.
@@ -287,6 +335,72 @@ public class CoordinatorServer extends ServerBase {
         }
     }
 
+    /**
+     * Cleans up leader-specific resources when this server loses leadership.
+     *
+     * <p>This method is called by {@link CoordinatorLeaderElection} when the 
server transitions
+     * from leader to standby. It cleans up leader-only resources while 
keeping the server running
+     * as a standby, ready to participate in future elections.
+     */
+    protected void cleanupCoordinatorLeader() throws Exception {
+        synchronized (lock) {
+            LOG.info("Cleaning up coordinator leader services.");
+
+            // Clean up leader-specific resources in reverse order of 
initialization
+            try {
+                if (coordinatorEventProcessor != null) {
+                    coordinatorEventProcessor.shutdown();
+                    coordinatorEventProcessor = null;
+                }
+            } catch (Throwable t) {
+                LOG.warn("Failed to shutdown coordinator event processor", t);
+            }
+
+            try {
+                if (coordinatorChannelManager != null) {
+                    coordinatorChannelManager.close();
+                    coordinatorChannelManager = null;
+                }
+            } catch (Throwable t) {
+                LOG.warn("Failed to close coordinator channel manager", t);
+            }
+
+            try {
+                if (autoPartitionManager != null) {
+                    autoPartitionManager.close();
+                    autoPartitionManager = null;
+                }
+            } catch (Throwable t) {
+                LOG.warn("Failed to close auto partition manager", t);
+            }
+
+            try {
+                if (rpcClient != null) {
+                    rpcClient.close();
+                    rpcClient = null;
+                }
+            } catch (Throwable t) {
+                LOG.warn("Failed to close RPC client", t);
+            }
+
+            try {
+                if (clientMetricGroup != null) {
+                    clientMetricGroup.close();
+                    clientMetricGroup = null;
+                }
+            } catch (Throwable t) {
+                LOG.warn("Failed to close client metric group", t);
+            }
+
+            // Reset coordinator context for next election
+            if (coordinatorContext != null) {
+                coordinatorContext.resetContext();
+            }
+
+            LOG.info("Coordinator leader services cleaned up successfully.");
+        }
+    }
+
     @Override
     protected CompletableFuture<Result> closeAsync(Result result) {
         if (isShutDown.compareAndSet(false, true)) {
@@ -306,6 +420,45 @@ public class CoordinatorServer extends ServerBase {
         return terminationFuture;
     }
 
+    private void registerCoordinatorServer() throws Exception {
+        long startTime = System.currentTimeMillis();
+        List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints();
+        CoordinatorAddress coordinatorAddress =
+                new CoordinatorAddress(
+                        this.serverId, 
Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf));
+
+        // we need to retry to register since although
+        // zkClient reconnect, the ephemeral node may still exist
+        // for a while time, retry to wait the ephemeral node removed
+        // see ZOOKEEPER-2985
+        while (true) {
+            try {
+                zkClient.registerCoordinatorServer(coordinatorAddress);
+                break;
+            } catch (KeeperException.NodeExistsException nodeExistsException) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) {
+                    LOG.error(
+                            "Coordinator Server register to Zookeeper exceeded 
total retry time of {} ms. "
+                                    + "Aborting registration attempts.",
+                            ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS);
+                    throw nodeExistsException;
+                }
+
+                LOG.warn(
+                        "Coordinator server already registered in Zookeeper. "
+                                + "retrying register after {} ms....",
+                        ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+                try {
+                    Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+                } catch (InterruptedException interruptedException) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            }
+        }
+    }
+
     private void registerCoordinatorLeader() throws Exception {
         long startTime = System.currentTimeMillis();
         List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints();
@@ -488,6 +641,14 @@ public class CoordinatorServer extends ServerBase {
                 exception = ExceptionUtils.firstOrSuppressed(t, exception);
             }
 
+            try {
+                if (coordinatorLeaderElection != null) {
+                    coordinatorLeaderElection.close();
+                }
+            } catch (Throwable t) {
+                exception = ExceptionUtils.firstOrSuppressed(t, exception);
+            }
+
             try {
                 if (zkClient != null) {
                     zkClient.close();
@@ -525,6 +686,11 @@ public class CoordinatorServer extends ServerBase {
         return coordinatorService;
     }
 
+    @VisibleForTesting
+    public CompletableFuture<Void> getLeaderElectionFuture() {
+        return leaderElectionFuture;
+    }
+
     @Override
     protected String getServerName() {
         return SERVER_NAME;
@@ -535,6 +701,11 @@ public class CoordinatorServer extends ServerBase {
         return rpcServer;
     }
 
+    @VisibleForTesting
+    public String getServerId() {
+        return serverId;
+    }
+
     @VisibleForTesting
     public ServerMetadataCache getMetadataCache() {
         return metadataCache;
@@ -553,4 +724,9 @@ public class CoordinatorServer extends ServerBase {
     public RebalanceManager getRebalanceManager() {
         return coordinatorEventProcessor.getRebalanceManager();
     }
+
+    @VisibleForTesting
+    public ZooKeeperClient getZooKeeperClient() {
+        return zkClient;
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 1cce283af..7f9d92c77 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -35,6 +35,7 @@ import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.exception.NonPrimaryKeyTableException;
+import org.apache.fluss.exception.NotCoordinatorLeaderException;
 import org.apache.fluss.exception.SecurityDisabledException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
@@ -241,6 +242,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     private final LakeTableHelper lakeTableHelper;
     private final ProducerOffsetsManager producerOffsetsManager;
     private final KvSnapshotLeaseManager kvSnapshotLeaseManager;
+    private final CoordinatorLeaderElection coordinatorLeaderElection;
 
     public CoordinatorService(
             Configuration conf,
@@ -254,7 +256,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
             LakeTableTieringManager lakeTableTieringManager,
             DynamicConfigManager dynamicConfigManager,
             ExecutorService ioExecutor,
-            KvSnapshotLeaseManager kvSnapshotLeaseManager) {
+            KvSnapshotLeaseManager kvSnapshotLeaseManager,
+            CoordinatorLeaderElection coordinatorLeaderElection) {
         super(
                 remoteFileSystem,
                 ServerType.COORDINATOR,
@@ -283,6 +286,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         this.producerOffsetsManager.start();
 
         this.kvSnapshotLeaseManager = kvSnapshotLeaseManager;
+        this.coordinatorLeaderElection = coordinatorLeaderElection;
     }
 
     @Override
@@ -290,6 +294,21 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return "coordinator";
     }
 
+    /**
+     * Checks whether this coordinator server is the current leader.
+     *
+     * <p>This method should be called at the beginning of every method 
annotated with {@link
+     * RequireCoordinatorLeader} to guard against requests being processed by 
a standby coordinator.
+     *
+     * @throws NotCoordinatorLeaderException if this server is not the current 
coordinator leader
+     */
+    private void checkLeader() {
+        if (!coordinatorLeaderElection.isLeader()) {
+            throw new NotCoordinatorLeaderException(
+                    "This coordinator server is not the current leader.");
+        }
+    }
+
     @Override
     public void shutdown() {
         IOUtils.closeQuietly(producerOffsetsManager, "producer snapshot 
manager");
@@ -348,8 +367,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         }
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreateDatabaseResponse> 
createDatabase(CreateDatabaseRequest request) {
+        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.CREATE, 
Resource.cluster());
         }
@@ -372,6 +393,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AlterDatabaseResponse> 
alterDatabase(AlterDatabaseRequest request) {
         String databaseName = request.getDatabaseName();
@@ -416,6 +438,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
 
     @Override
     public CompletableFuture<DropDatabaseResponse> 
dropDatabase(DropDatabaseRequest request) {
+        checkLeader();
         authorizeDatabase(OperationType.DROP, request.getDatabaseName());
         DropDatabaseResponse response = new DropDatabaseResponse();
         metadataManager.dropDatabase(
@@ -423,8 +446,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreateTableResponse> 
createTable(CreateTableRequest request) {
+        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         tablePath.validate();
         authorizeDatabase(OperationType.CREATE, tablePath.getDatabaseName());
@@ -493,8 +518,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(new CreateTableResponse());
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest 
request) {
+        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         tablePath.validate();
         authorizeTable(OperationType.ALTER, tablePath);
@@ -653,8 +680,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return Boolean.parseBoolean(dataLakeEnabledValue);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropTableResponse> dropTable(DropTableRequest 
request) {
+        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         authorizeTable(OperationType.DROP, tablePath);
 
@@ -663,9 +692,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreatePartitionResponse> createPartition(
             CreatePartitionRequest request) {
+        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         authorizeTable(OperationType.WRITE, tablePath);
 
@@ -707,8 +738,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropPartitionResponse> 
dropPartition(DropPartitionRequest request) {
+        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         authorizeTable(OperationType.WRITE, tablePath);
 
@@ -729,8 +762,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<MetadataResponse> metadata(MetadataRequest 
request) {
+        checkLeader();
         String listenerName = currentListenerName();
         Session session = currentSession();
 
@@ -749,7 +784,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return metadataResponseAccessContextEvent.getResultFuture();
     }
 
+    @RequireCoordinatorLeader
     public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest 
request) {
+        checkLeader();
         CompletableFuture<AdjustIsrResponse> response = new 
CompletableFuture<>();
         eventManagerSupplier
                 .get()
@@ -757,9 +794,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CommitKvSnapshotResponse> commitKvSnapshot(
             CommitKvSnapshotRequest request) {
+        checkLeader();
         CompletableFuture<CommitKvSnapshotResponse> response = new 
CompletableFuture<>();
         // parse completed snapshot from request
         byte[] completedSnapshotBytes = request.getCompletedSnapshot();
@@ -774,9 +813,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CommitRemoteLogManifestResponse> 
commitRemoteLogManifest(
             CommitRemoteLogManifestRequest request) {
+        checkLeader();
         CompletableFuture<CommitRemoteLogManifestResponse> response = new 
CompletableFuture<>();
         eventManagerSupplier
                 .get()
@@ -786,8 +827,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreateAclsResponse> createAcls(CreateAclsRequest 
request) {
+        checkLeader();
         if (authorizer == null) {
             throw new SecurityDisabledException("No Authorizer is 
configured.");
         }
@@ -796,8 +839,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return 
CompletableFuture.completedFuture(makeCreateAclsResponse(aclCreateResults));
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest 
request) {
+        checkLeader();
         if (authorizer == null) {
             throw new SecurityDisabledException("No Authorizer is 
configured.");
         }
@@ -806,9 +851,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return 
CompletableFuture.completedFuture(makeDropAclsResponse(aclDeleteResults));
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<PrepareLakeTableSnapshotResponse> 
prepareLakeTableSnapshot(
             PrepareLakeTableSnapshotRequest request) {
+        checkLeader();
         CompletableFuture<PrepareLakeTableSnapshotResponse> future = new 
CompletableFuture<>();
         boolean ignorePreviousBucketOffsets =
                 request.hasIgnorePreviousTableOffsets() && 
request.isIgnorePreviousTableOffsets();
@@ -857,9 +904,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return future;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CommitLakeTableSnapshotResponse> 
commitLakeTableSnapshot(
             CommitLakeTableSnapshotRequest request) {
+        checkLeader();
         CompletableFuture<CommitLakeTableSnapshotResponse> response = new 
CompletableFuture<>();
         eventManagerSupplier
                 .get()
@@ -869,9 +918,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<LakeTieringHeartbeatResponse> 
lakeTieringHeartbeat(
             LakeTieringHeartbeatRequest request) {
+        checkLeader();
         LakeTieringHeartbeatResponse heartbeatResponse = new 
LakeTieringHeartbeatResponse();
         int currentCoordinatorEpoch = coordinatorEpochSupplier.get();
         heartbeatResponse.setCoordinatorEpoch(currentCoordinatorEpoch);
@@ -945,9 +996,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(heartbeatResponse);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
             ControlledShutdownRequest request) {
+        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.cluster());
         }
@@ -963,9 +1016,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AcquireKvSnapshotLeaseResponse> 
acquireKvSnapshotLease(
             AcquireKvSnapshotLeaseRequest request) {
+        checkLeader();
         for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
                 request.getSnapshotsToLeasesList()) {
             long tableId = kvSnapshotLeaseForTable.getTableId();
@@ -997,9 +1052,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<ReleaseKvSnapshotLeaseResponse> 
releaseKvSnapshotLease(
             ReleaseKvSnapshotLeaseRequest request) {
+        checkLeader();
         for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
             long tableId = tableBucket.getTableId();
             if (authorizer != null) {
@@ -1030,9 +1087,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropKvSnapshotLeaseResponse> dropKvSnapshotLease(
             DropKvSnapshotLeaseRequest request) {
+        checkLeader();
         String leaseId = request.getLeaseId();
         // Capture session before entering async block since currentSession() 
is thread-local
         Session session = authorizer != null ? currentSession() : null;
@@ -1069,9 +1128,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs(
             AlterClusterConfigsRequest request) {
+        checkLeader();
         CompletableFuture<AlterClusterConfigsResponse> future = new 
CompletableFuture<>();
         List<PbAlterConfig> infos = request.getAlterConfigsList();
         if (infos.isEmpty()) {
@@ -1112,8 +1173,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return future;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AddServerTagResponse> 
addServerTag(AddServerTagRequest request) {
+        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.cluster());
         }
@@ -1131,9 +1194,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<RemoveServerTagResponse> removeServerTag(
             RemoveServerTagRequest request) {
+        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.cluster());
         }
@@ -1151,8 +1216,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest 
request) {
+        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.WRITE, 
Resource.cluster());
         }
@@ -1166,9 +1233,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<ListRebalanceProgressResponse> 
listRebalanceProgress(
             ListRebalanceProgressRequest request) {
+        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.DESCRIBE, 
Resource.cluster());
         }
@@ -1183,9 +1252,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CancelRebalanceResponse> cancelRebalance(
             CancelRebalanceRequest request) {
+        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.WRITE, 
Resource.cluster());
         }
@@ -1296,9 +1367,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     // Producer Offset Management APIs (for Exactly-Once Semantics)
     // 
==================================================================================
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<RegisterProducerOffsetsResponse> 
registerProducerOffsets(
             RegisterProducerOffsetsRequest request) {
+        checkLeader();
         // Authorization: require WRITE permission on all tables in the request
         if (authorizer != null) {
             for (PbProducerTableOffsets tableOffsets : 
request.getTableOffsetsList()) {
@@ -1342,9 +1415,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<GetProducerOffsetsResponse> getProducerOffsets(
             GetProducerOffsetsRequest request) {
+        checkLeader();
         String producerId = request.getProducerId();
         // Capture session before entering async block since currentSession() 
is thread-local
         Session session = authorizer != null ? currentSession() : null;
@@ -1401,9 +1476,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
+    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DeleteProducerOffsetsResponse> 
deleteProducerOffsets(
             DeleteProducerOffsetsRequest request) {
+        checkLeader();
         // Capture session before entering async block since currentSession() 
is thread-local
         Session session = authorizer != null ? currentSession() : null;
         return CompletableFuture.supplyAsync(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java
new file mode 100644
index 000000000..cbf0178a3
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to mark RPC methods that require the coordinator server to be 
the current leader
+ * before execution.
+ *
+ * <p>When a method annotated with {@code @RequireCoordinatorLeader} is 
invoked, it will first check
+ * whether this coordinator server is the current leader via {@link
+ * CoordinatorLeaderElection#isLeader()}. If the server is not the leader, a 
{@link
+ * org.apache.fluss.exception.NotCoordinatorLeaderException} will be thrown 
immediately, and the
+ * actual method body will not be executed.
+ *
+ * <p>Usage example:
+ *
+ * <pre>{@code
+ * @RequireCoordinatorLeader
+ * public CompletableFuture<CreateTableResponse> 
createTable(CreateTableRequest request) {
+ *     // method body only executes when this server is the coordinator leader
+ * }
+ * }</pre>
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RequireCoordinatorLeader {}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
index ad22f6d1c..74a49a0da 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
@@ -62,6 +62,7 @@ public final class CoordinatorEventManager implements 
EventManager {
     private Histogram eventQueueTime;
 
     // Coordinator metrics moved from CoordinatorEventProcessor
+    private volatile int aliveCoordinatorServerCount;
     private volatile int tabletServerCount;
     private volatile int offlineBucketCount;
     private volatile int tableCount;
@@ -88,6 +89,8 @@ public final class CoordinatorEventManager implements 
EventManager {
 
         // Register coordinator metrics
         coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () 
-> 1);
+        coordinatorMetricGroup.gauge(
+                MetricNames.ALIVE_COORDINATOR_COUNT, () -> 
aliveCoordinatorServerCount);
         coordinatorMetricGroup.gauge(
                 MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> 
tabletServerCount);
         coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> 
offlineBucketCount);
@@ -105,6 +108,7 @@ public final class CoordinatorEventManager implements 
EventManager {
         AccessContextEvent<MetricsData> accessContextEvent =
                 new AccessContextEvent<>(
                         context -> {
+                            int coordinatorServerCount = 
context.getLiveCoordinatorServers().size();
                             int tabletServerCount = 
context.getLiveTabletServers().size();
                             int tableCount = context.allTables().size();
                             int lakeTableCount = context.getLakeTableCount();
@@ -138,6 +142,7 @@ public final class CoordinatorEventManager implements 
EventManager {
                             }
 
                             return new MetricsData(
+                                    coordinatorServerCount,
                                     tabletServerCount,
                                     tableCount,
                                     lakeTableCount,
@@ -152,6 +157,7 @@ public final class CoordinatorEventManager implements 
EventManager {
         // Wait for the result and update local metrics
         try {
             MetricsData metricsData = 
accessContextEvent.getResultFuture().get();
+            this.aliveCoordinatorServerCount = 
metricsData.coordinatorServerCount;
             this.tabletServerCount = metricsData.tabletServerCount;
             this.tableCount = metricsData.tableCount;
             this.lakeTableCount = metricsData.lakeTableCount;
@@ -275,6 +281,7 @@ public final class CoordinatorEventManager implements 
EventManager {
     }
 
     private static class MetricsData {
+        private final int coordinatorServerCount;
         private final int tabletServerCount;
         private final int tableCount;
         private final int lakeTableCount;
@@ -284,6 +291,7 @@ public final class CoordinatorEventManager implements 
EventManager {
         private final int replicasToDeleteCount;
 
         public MetricsData(
+                int coordinatorServerCount,
                 int tabletServerCount,
                 int tableCount,
                 int lakeTableCount,
@@ -291,6 +299,7 @@ public final class CoordinatorEventManager implements 
EventManager {
                 int partitionCount,
                 int offlineBucketCount,
                 int replicasToDeleteCount) {
+            this.coordinatorServerCount = coordinatorServerCount;
             this.tabletServerCount = tabletServerCount;
             this.tableCount = tableCount;
             this.lakeTableCount = lakeTableCount;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorEvent.java
similarity index 54%
copy from 
fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
copy to 
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorEvent.java
index 5953f3366..17d17b0bc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorEvent.java
@@ -15,51 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.server.zk.data;
+package org.apache.fluss.server.coordinator.event;
 
-import org.apache.fluss.cluster.Endpoint;
-
-import java.util.List;
 import java.util.Objects;
 
-/**
- * The address information of an active coordinator stored in {@link 
ZkData.CoordinatorZNode}.
- *
- * @see CoordinatorAddressJsonSerde for json serialization and deserialization.
- */
-public class CoordinatorAddress {
-    private final String id;
-    private final List<Endpoint> endpoints;
+/** An event for coordinator server became dead. */
+public class DeadCoordinatorEvent implements CoordinatorEvent {
 
-    public CoordinatorAddress(String id, List<Endpoint> endpoints) {
-        this.id = id;
-        this.endpoints = endpoints;
-    }
+    private final String serverId;
 
-    public String getId() {
-        return id;
+    public DeadCoordinatorEvent(String serverId) {
+        this.serverId = serverId;
     }
 
-    public List<Endpoint> getEndpoints() {
-        return endpoints;
+    public String getServerId() {
+        return serverId;
     }
 
     @Override
     public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        CoordinatorAddress that = (CoordinatorAddress) o;
-        return Objects.equals(id, that.id) && Objects.equals(endpoints, 
that.endpoints);
+        DeadCoordinatorEvent that = (DeadCoordinatorEvent) o;
+        return Objects.equals(serverId, that.serverId);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, endpoints);
+        return Objects.hash(serverId);
     }
 
     @Override
     public String toString() {
-        return "CoordinatorAddress{" + "id='" + id + '\'' + ", endpoints=" + 
endpoints + '}';
+        return "DeadCoordinatorEvent{" + "serverId=" + serverId + '}';
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorEvent.java
similarity index 54%
copy from 
fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
copy to 
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorEvent.java
index 5953f3366..da6dcf2b5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorEvent.java
@@ -15,51 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.server.zk.data;
+package org.apache.fluss.server.coordinator.event;
 
-import org.apache.fluss.cluster.Endpoint;
-
-import java.util.List;
 import java.util.Objects;
 
-/**
- * The address information of an active coordinator stored in {@link 
ZkData.CoordinatorZNode}.
- *
- * @see CoordinatorAddressJsonSerde for json serialization and deserialization.
- */
-public class CoordinatorAddress {
-    private final String id;
-    private final List<Endpoint> endpoints;
+/** An event for new coordinator server. */
+public class NewCoordinatorEvent implements CoordinatorEvent {
 
-    public CoordinatorAddress(String id, List<Endpoint> endpoints) {
-        this.id = id;
-        this.endpoints = endpoints;
-    }
+    private final String serverId;
 
-    public String getId() {
-        return id;
+    public NewCoordinatorEvent(String serverId) {
+        this.serverId = serverId;
     }
 
-    public List<Endpoint> getEndpoints() {
-        return endpoints;
+    public String getServerId() {
+        return serverId;
     }
 
     @Override
     public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        CoordinatorAddress that = (CoordinatorAddress) o;
-        return Objects.equals(id, that.id) && Objects.equals(endpoints, 
that.endpoints);
+        NewCoordinatorEvent that = (NewCoordinatorEvent) o;
+        return Objects.equals(serverId, that.serverId);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, endpoints);
+        return Objects.hash(serverId);
     }
 
     @Override
     public String toString() {
-        return "CoordinatorAddress{" + "id='" + id + '\'' + ", endpoints=" + 
endpoints + '}';
+        return "NewCoordinatorEvent{" + "serverId=" + serverId + '}';
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java
new file mode 100644
index 000000000..2f30f3ac0
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in 
zookeeper. */
+public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorChangeWatcher.class);
+
+    public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient, 
EventManager eventManager) {
+        super(zooKeeperClient, eventManager, 
ZkData.CoordinatorIdsZNode.path());
+    }
+
+    @Override
+    protected CuratorCacheListener createListener() {
+        return new CoordinatorChangeListener();
+    }
+
+    @Override
+    protected String getWatcherName() {
+        return "CoordinatorChangeWatcher";
+    }
+
+    protected String getServerIdFromEvent(ChildData data) {
+        return ZKPaths.getNodeFromPath(data.getPath());
+    }
+
+    private final class CoordinatorChangeListener implements 
CuratorCacheListener {
+
+        @Override
+        public void event(Type type, ChildData oldData, ChildData newData) {
+            if (newData != null) {
+                LOG.debug("Received {} event (path: {})", type, 
newData.getPath());
+            } else {
+                LOG.debug("Received {} event", type);
+            }
+
+            switch (type) {
+                case NODE_CREATED:
+                    {
+                        if (newData != null && newData.getData().length > 0) {
+                            String serverId = getServerIdFromEvent(newData);
+                            LOG.info("Received CHILD_ADDED event for server 
{}.", serverId);
+                            eventManager.put(new 
NewCoordinatorEvent(serverId));
+                        }
+                        break;
+                    }
+                case NODE_DELETED:
+                    {
+                        if (oldData != null && oldData.getData().length > 0) {
+                            String serverId = getServerIdFromEvent(oldData);
+                            LOG.info("Received CHILD_REMOVED event for server 
{}.", serverId);
+                            eventManager.put(new 
DeadCoordinatorEvent(serverId));
+                        }
+                        break;
+                    }
+                default:
+                    break;
+            }
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/ServerBaseChangeWatcher.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/ServerBaseChangeWatcher.java
new file mode 100644
index 000000000..e9f82c95b
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/ServerBaseChangeWatcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract base server class for {@link CoordinatorChangeWatcher} and 
{@link
+ * TabletServerChangeWatcher}.
+ */
+public abstract class ServerBaseChangeWatcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ServerBaseChangeWatcher.class);
+
+    protected final CuratorCache curatorCache;
+    protected final EventManager eventManager;
+    protected volatile boolean running;
+
+    public ServerBaseChangeWatcher(
+            ZooKeeperClient zooKeeperClient, EventManager eventManager, String 
zkPath) {
+        this.curatorCache = 
CuratorCache.build(zooKeeperClient.getCuratorClient(), zkPath);
+        this.eventManager = eventManager;
+        this.curatorCache.listenable().addListener(createListener());
+    }
+
+    /** Creates the listener for server change events. */
+    protected abstract CuratorCacheListener createListener();
+
+    /** Returns the watcher name for logging. */
+    protected abstract String getWatcherName();
+
+    public void start() {
+        running = true;
+        curatorCache.start();
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        running = false;
+        LOG.info("Stopping {}", getWatcherName());
+        curatorCache.close();
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java
index a8f4dcb15..569d5117a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java
@@ -28,7 +28,6 @@ import 
org.apache.fluss.server.zk.data.TabletServerRegistration;
 import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode;
 import org.apache.fluss.server.zk.data.ZkData.ServerIdsZNode;
 import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
-import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
 import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
 
@@ -36,34 +35,31 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** A watcher to watch the tablet server changes(new/delete) in zookeeper. */
-public class TabletServerChangeWatcher {
+public class TabletServerChangeWatcher extends ServerBaseChangeWatcher {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TabletServerChangeWatcher.class);
-    private final CuratorCache curatorCache;
-
-    private volatile boolean running;
-
-    private final EventManager eventManager;
 
     public TabletServerChangeWatcher(ZooKeeperClient zooKeeperClient, 
EventManager eventManager) {
-        this.curatorCache =
-                CuratorCache.build(zooKeeperClient.getCuratorClient(), 
ServerIdsZNode.path());
-        this.eventManager = eventManager;
-        this.curatorCache.listenable().addListener(new 
TabletServerChangeListener());
+        super(zooKeeperClient, eventManager, ServerIdsZNode.path());
     }
 
-    public void start() {
-        running = true;
-        curatorCache.start();
+    @Override
+    protected CuratorCacheListener createListener() {
+        return new TabletServerChangeListener();
     }
 
-    public void stop() {
-        if (!running) {
-            return;
+    @Override
+    protected String getWatcherName() {
+        return "TabletServerChangeWatcher";
+    }
+
+    protected int getServerIdFromEvent(ChildData data) {
+        try {
+            return Integer.parseInt(ZKPaths.getNodeFromPath(data.getPath()));
+        } catch (NumberFormatException e) {
+            throw new FlussRuntimeException(
+                    "Invalid server id in zookeeper path: " + data.getPath(), 
e);
         }
-        running = false;
-        LOG.info("Stopping TableChangeWatcher");
-        curatorCache.close();
     }
 
     private final class TabletServerChangeListener implements 
CuratorCacheListener {
@@ -108,13 +104,4 @@ public class TabletServerChangeWatcher {
             }
         }
     }
-
-    private int getServerIdFromEvent(ChildData data) {
-        try {
-            return Integer.parseInt(ZKPaths.getNodeFromPath(data.getPath()));
-        } catch (NumberFormatException e) {
-            throw new FlussRuntimeException(
-                    "Invalid server id in zookeeper path: " + data.getPath(), 
e);
-        }
-    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index 749257409..78667ca48 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -62,7 +62,6 @@ import 
org.apache.fluss.server.zk.data.ZkData.BucketRemoteLogsZNode;
 import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotIdZNode;
 import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotsZNode;
 import org.apache.fluss.server.zk.data.ZkData.ConfigEntityZNode;
-import org.apache.fluss.server.zk.data.ZkData.CoordinatorZNode;
 import org.apache.fluss.server.zk.data.ZkData.DatabaseZNode;
 import org.apache.fluss.server.zk.data.ZkData.DatabasesZNode;
 import org.apache.fluss.server.zk.data.ZkData.KvSnapshotLeaseZNode;
@@ -188,20 +187,38 @@ public class ZooKeeperClient implements AutoCloseable {
     // Coordinator server
     // 
--------------------------------------------------------------------------------------------
 
-    /** Register a coordinator leader server to ZK. */
+    /** Register a coordinator server to ZK. */
+    public void registerCoordinatorServer(CoordinatorAddress 
coordinatorAddress) throws Exception {
+        String path = 
ZkData.CoordinatorIdZNode.path(coordinatorAddress.getId());
+        zkClient.create()
+                .creatingParentsIfNeeded()
+                .withMode(CreateMode.EPHEMERAL)
+                .forPath(path, 
ZkData.CoordinatorIdZNode.encode(coordinatorAddress));
+        LOG.info("Registered Coordinator server {} at path {}.", 
coordinatorAddress, path);
+    }
+
+    /** Register a coordinator leader to ZK. */
     public void registerCoordinatorLeader(CoordinatorAddress 
coordinatorAddress) throws Exception {
-        String path = CoordinatorZNode.path();
+        String path = ZkData.CoordinatorLeaderZNode.path();
         zkClient.create()
                 .creatingParentsIfNeeded()
                 .withMode(CreateMode.EPHEMERAL)
-                .forPath(path, CoordinatorZNode.encode(coordinatorAddress));
-        LOG.info("Registered leader {} at path {}.", coordinatorAddress, path);
+                .forPath(path, 
ZkData.CoordinatorLeaderZNode.encode(coordinatorAddress));
+        LOG.info("Registered Coordinator leader {} at path {}.", 
coordinatorAddress, path);
     }
 
     /** Get the leader address registered in ZK. */
-    public Optional<CoordinatorAddress> getCoordinatorAddress() throws 
Exception {
-        Optional<byte[]> bytes = getOrEmpty(CoordinatorZNode.path());
-        return bytes.map(CoordinatorZNode::decode);
+    public Optional<CoordinatorAddress> getCoordinatorLeaderAddress() throws 
Exception {
+        Optional<byte[]> bytes = 
getOrEmpty(ZkData.CoordinatorLeaderZNode.path());
+        return bytes.map(
+                data ->
+                        // maybe an empty node when a leader is elected but 
not registered
+                        data.length == 0 ? null : 
ZkData.CoordinatorLeaderZNode.decode(data));
+    }
+
+    /** Gets the list of coordinator server Ids. */
+    public List<String> getCoordinatorServerList() throws Exception {
+        return getChildren(ZkData.CoordinatorIdsZNode.path());
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
index 5953f3366..7759e66b8 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * The address information of an active coordinator stored in {@link 
ZkData.CoordinatorZNode}.
+ * The address information of an active coordinator stored in {@link 
ZkData.CoordinatorLeaderZNode}.
  *
  * @see CoordinatorAddressJsonSerde for json serialization and deserialization.
  */
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
index aed5de0b3..6e007fdfc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
@@ -305,13 +305,53 @@ public final class ZkData {
     // 
------------------------------------------------------------------------------------------
 
     /**
-     * The znode for the active coordinator. The znode path is:
+     * The znode for alive coordinators. The znode path is:
      *
-     * <p>/coordinators/active
+     * <p>/coordinators/ids
+     */
+    public static final class CoordinatorIdsZNode {
+        public static String path() {
+            return "/coordinators/ids";
+        }
+    }
+
+    /**
+     * The znode for a registered Coordinator information. The znode path is:
+     *
+     * <p>/coordinators/ids/[serverId]
+     */
+    public static final class CoordinatorIdZNode {
+        public static String path(String serverId) {
+            return CoordinatorIdsZNode.path() + "/" + serverId;
+        }
+
+        public static byte[] encode(CoordinatorAddress coordinatorAddress) {
+            return JsonSerdeUtils.writeValueAsBytes(
+                    coordinatorAddress, CoordinatorAddressJsonSerde.INSTANCE);
+        }
+
+        public static CoordinatorAddress decode(byte[] json) {
+            return JsonSerdeUtils.readValue(json, 
CoordinatorAddressJsonSerde.INSTANCE);
+        }
+    }
+
+    /**
+     * The znode for the coordinator leader election. The znode path is:
+     *
+     * <p>/coordinators/election
+     */
+    public static final class CoordinatorElectionZNode {
+        public static String path() {
+            return "/coordinators/election";
+        }
+    }
+
+    /**
+     * The znode for the active coordinator leader. The znode path is:
      *
-     * <p>Note: introduce standby coordinators in the future for znode 
"/coordinators/standby/".
+     * <p>/coordinators/leader
      */
-    public static final class CoordinatorZNode {
+    public static final class CoordinatorLeaderZNode {
         public static String path() {
             return "/coordinators/active";
         }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java 
b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
index 2504aebed..c71f0f96c 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java
@@ -25,7 +25,7 @@ import org.apache.fluss.server.coordinator.CoordinatorServer;
 import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperExtension;
-import org.apache.fluss.server.zk.data.ZkData.CoordinatorZNode;
+import org.apache.fluss.server.zk.data.ZkData;
 import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode;
 import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
 import org.apache.fluss.testutils.common.AllCallbackWrapper;
@@ -82,10 +82,13 @@ public abstract class ServerTestBase {
     @Test
     void testExceptionWhenRunServer() throws Exception {
         ServerBase server = getStartFailServer();
-        assertThatThrownBy(server::start)
-                .isInstanceOf(FlussException.class)
-                .hasMessage(String.format("Failed to start the %s.", 
server.getServerName()));
-        server.close();
+        try {
+            assertThatThrownBy(server::start)
+                    .isInstanceOf(FlussException.class)
+                    .hasMessage(String.format("Failed to start the %s.", 
server.getServerName()));
+        } finally {
+            server.close();
+        }
     }
 
     @Test
@@ -94,7 +97,7 @@ public abstract class ServerTestBase {
         // get the EPHEMERAL node of server
         String path =
                 server instanceof CoordinatorServer
-                        ? CoordinatorZNode.path()
+                        ? ZkData.CoordinatorIdZNode.path(((CoordinatorServer) 
server).getServerId())
                         : 
ServerIdZNode.path(server.conf.getInt(ConfigOptions.TABLET_SERVER_ID));
 
         long oldNodeCtime = zookeeperClient.getStat(path).get().getCtime();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 0d304df1e..997a8738f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -417,7 +417,7 @@ class CoordinatorEventProcessorTest {
         BucketState t1Bucket0State = fromCtx(ctx -> 
ctx.getBucketState(t1Bucket0));
         assertThat(t1Bucket0State).isEqualTo(OnlineBucket);
         // t1 bucket 1 should reelect a leader since the leader is not alive
-        // the bucket whose leader is in the server should be online a again, 
but the leadership
+        // the bucket whose leader is in the server should be online again, 
but the leadership
         // should change the leader for bucket2 of t1 should change since the 
leader fail
         BucketState t1Bucket1State = fromCtx(ctx -> 
ctx.getBucketState(t1Bucket1));
         assertThat(t1Bucket1State).isEqualTo(OnlineBucket);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java
new file mode 100644
index 000000000..66cc39332
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.cluster.Endpoint;
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.NotCoordinatorLeaderException;
+import org.apache.fluss.rpc.GatewayClientProxy;
+import org.apache.fluss.rpc.RpcClient;
+import org.apache.fluss.rpc.gateway.CoordinatorGateway;
+import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.CoordinatorAddress;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.Watcher;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration test for Coordinator Server High Availability.
+ *
+ * <p>This test verifies that when multiple coordinator servers are started, 
only the leader
+ * processes RPC requests while standby servers return {@link 
NotCoordinatorLeaderException}.
+ *
+ * <p>It also tests the complete leader election lifecycle: participate in 
election -> become leader
+ * (initialize leader services) -> lose leadership (become standby) -> 
re-participate in election ->
+ * become leader again.
+ */
+class CoordinatorHAITCase {
+
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    private static ZooKeeperClient zookeeperClient;
+
+    private CoordinatorServer coordinatorServer1;
+    private CoordinatorServer coordinatorServer2;
+    private RpcClient rpcClient;
+
+    @BeforeAll
+    static void baseBeforeAll() {
+        zookeeperClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @BeforeEach
+    void setUp() throws Exception {
+        Configuration clientConf = new Configuration();
+        rpcClient = RpcClient.create(clientConf, 
TestingClientMetricGroup.newInstance(), false);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (coordinatorServer1 != null) {
+            coordinatorServer1.close();
+        }
+        if (coordinatorServer2 != null) {
+            coordinatorServer2.close();
+        }
+        if (rpcClient != null) {
+            rpcClient.close();
+        }
+    }
+
+    @Test
+    void testLeaderAndStandbyRpcBehavior() throws Exception {
+        coordinatorServer1 = new CoordinatorServer(createConfiguration());
+        coordinatorServer2 = new CoordinatorServer(createConfiguration());
+
+        List<CoordinatorServer> coordinatorServerList =
+                Arrays.asList(coordinatorServer1, coordinatorServer2);
+
+        // start 2 coordinator servers
+        for (CoordinatorServer server : coordinatorServerList) {
+            server.start();
+        }
+
+        // wait until one coordinator becomes leader
+        waitUntilCoordinatorServerElected();
+
+        CoordinatorAddress leaderAddress = 
zookeeperClient.getCoordinatorLeaderAddress().get();
+
+        // find the leader and standby servers
+        CoordinatorServer leaderServer = null;
+        CoordinatorServer standbyServer = null;
+        for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+            if (Objects.equals(coordinatorServer.getServerId(), 
leaderAddress.getId())) {
+                leaderServer = coordinatorServer;
+            } else {
+                standbyServer = coordinatorServer;
+            }
+        }
+
+        assertThat(leaderServer).isNotNull();
+        assertThat(standbyServer).isNotNull();
+
+        // create gateways for both servers
+        CoordinatorGateway leaderGateway = 
createGatewayForServer(leaderServer);
+        CoordinatorGateway standbyGateway = 
createGatewayForServer(standbyServer);
+
+        // test: leader should process request normally (no exception means 
success)
+        String testDbName = "test_ha_db_" + System.currentTimeMillis();
+        CreateDatabaseRequest createDbRequest =
+                new 
CreateDatabaseRequest().setDatabaseName(testDbName).setIgnoreIfExists(false);
+
+        // leader should process request successfully without throwing 
exception
+        leaderGateway.createDatabase(createDbRequest).get();
+
+        // test: standby should throw NotCoordinatorLeaderException
+        String testDbName2 = "test_ha_db2_" + System.currentTimeMillis();
+        CreateDatabaseRequest createDbRequest2 =
+                new 
CreateDatabaseRequest().setDatabaseName(testDbName2).setIgnoreIfExists(false);
+
+        assertThatThrownBy(() -> 
standbyGateway.createDatabase(createDbRequest2).get())
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(NotCoordinatorLeaderException.class)
+                .hasMessageContaining("not the current leader");
+    }
+
+    @Test
+    void testLeaderLosesLeadershipAndReElected() throws Exception {
+        coordinatorServer1 = new CoordinatorServer(createConfiguration());
+        coordinatorServer2 = new CoordinatorServer(createConfiguration());
+
+        coordinatorServer1.start();
+        coordinatorServer2.start();
+
+        // Step 1: Initial leader election
+        waitUntilCoordinatorServerElected();
+        CoordinatorAddress firstLeaderAddr = 
zookeeperClient.getCoordinatorLeaderAddress().get();
+
+        CoordinatorServer leader = findServerById(firstLeaderAddr.getId());
+        CoordinatorServer standby = findServerByNotId(firstLeaderAddr.getId());
+        assertThat(leader).isNotNull();
+        assertThat(standby).isNotNull();
+
+        verifyServerIsLeader(leader, "test_reentrant_db_1");
+        verifyServerIsStandby(standby);
+
+        // Step 2: Kill leader's ZK session → standby becomes the new leader
+        killZkSession(leader);
+        waitUntilNewLeaderElected(leader.getServerId());
+        assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId())
+                .as("After killing leader, standby should become leader")
+                .isEqualTo(standby.getServerId());
+
+        verifyServerIsLeader(standby, "test_reentrant_db_2");
+        verifyServerIsStandby(leader);
+
+        // Step 3: Wait for the killed leader to fully reconnect and re-join 
election,
+        // then kill the current leader → the original leader should become 
leader again.
+        // This proves LeaderLatch election is re-entrant.
+        waitUntilServerRegistered(leader);
+        killZkSession(standby);
+        waitUntilNewLeaderElected(standby.getServerId());
+        assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId())
+                .as("Original leader should be re-elected, proving re-entrant 
election")
+                .isEqualTo(leader.getServerId());
+
+        verifyServerIsLeader(leader, "test_reentrant_db_3");
+        createGatewayForServer(leader).metadata(new MetadataRequest()).get();
+    }
+
+    @Test
+    void testMultipleLeadershipLossAndRecovery() throws Exception {
+        coordinatorServer1 = new CoordinatorServer(createConfiguration());
+        coordinatorServer2 = new CoordinatorServer(createConfiguration());
+
+        coordinatorServer1.start();
+        coordinatorServer2.start();
+
+        // Perform multiple leadership loss and recovery cycles.
+        // Each iteration kills the current leader and waits for the other 
server to take over.
+        for (int i = 0; i < 3; i++) {
+            waitUntilCoordinatorServerElected();
+            CoordinatorAddress currentLeaderAddress =
+                    zookeeperClient.getCoordinatorLeaderAddress().get();
+
+            CoordinatorServer currentLeader = 
findServerById(currentLeaderAddress.getId());
+            CoordinatorServer otherServer = 
findServerByNotId(currentLeaderAddress.getId());
+            assertThat(currentLeader).isNotNull();
+
+            verifyServerIsLeader(currentLeader, "test_cycle_db_" + i);
+
+            // Ensure the other server has reconnected and re-joined election
+            // (it may have been killed in a previous iteration)
+            waitUntilServerRegistered(otherServer);
+
+            killZkSession(currentLeader);
+            waitUntilNewLeaderElected(currentLeaderAddress.getId());
+        }
+
+        // Final verification: ensure cluster is still functional
+        CoordinatorAddress finalLeaderAddress = 
zookeeperClient.getCoordinatorLeaderAddress().get();
+        CoordinatorServer finalLeader = 
findServerById(finalLeaderAddress.getId());
+        assertThat(finalLeader).isNotNull();
+        createGatewayForServer(finalLeader).metadata(new 
MetadataRequest()).get();
+    }
+
+    private CoordinatorGateway createGatewayForServer(CoordinatorServer 
server) {
+        List<Endpoint> endpoints = server.getRpcServer().getBindEndpoints();
+        Endpoint endpoint = endpoints.get(0);
+        // Use server.hashCode() as id since serverId is a UUID string
+        ServerNode serverNode =
+                new ServerNode(
+                        server.hashCode(),
+                        endpoint.getHost(),
+                        endpoint.getPort(),
+                        ServerType.COORDINATOR);
+
+        return GatewayClientProxy.createGatewayProxy(
+                () -> serverNode, rpcClient, CoordinatorGateway.class);
+    }
+
+    private static Configuration createConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                ConfigOptions.ZOOKEEPER_ADDRESS,
+                
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString());
+        configuration.setString(
+                ConfigOptions.BIND_LISTENERS, 
"CLIENT://localhost:0,FLUSS://localhost:0");
+        configuration.setString(ConfigOptions.ADVERTISED_LISTENERS, 
"CLIENT://198.168.0.1:100");
+        configuration.set(ConfigOptions.REMOTE_DATA_DIR, 
"/tmp/fluss/remote-data");
+        // Use shorter timeout for faster test execution, but not too short to 
avoid false session
+        // timeouts
+        // 5 seconds is enough for testing while avoiding spurious session 
expirations
+        configuration.set(ConfigOptions.ZOOKEEPER_SESSION_TIMEOUT, 
Duration.ofSeconds(5));
+        configuration.set(ConfigOptions.ZOOKEEPER_CONNECTION_TIMEOUT, 
Duration.ofSeconds(5));
+        configuration.set(ConfigOptions.ZOOKEEPER_RETRY_WAIT, 
Duration.ofMillis(500));
+        return configuration;
+    }
+
+    private void waitUntilCoordinatorServerElected() {
+        waitUntil(
+                () -> 
zookeeperClient.getCoordinatorLeaderAddress().isPresent(),
+                Duration.ofMinutes(1),
+                "Fail to wait coordinator server elected");
+    }
+
+    private CoordinatorServer findServerById(String serverId) {
+        if (coordinatorServer1 != null && 
coordinatorServer1.getServerId().equals(serverId)) {
+            return coordinatorServer1;
+        }
+        if (coordinatorServer2 != null && 
coordinatorServer2.getServerId().equals(serverId)) {
+            return coordinatorServer2;
+        }
+        return null;
+    }
+
+    private CoordinatorServer findServerByNotId(String serverId) {
+        if (coordinatorServer1 != null && 
!coordinatorServer1.getServerId().equals(serverId)) {
+            return coordinatorServer1;
+        }
+        if (coordinatorServer2 != null && 
!coordinatorServer2.getServerId().equals(serverId)) {
+            return coordinatorServer2;
+        }
+        return null;
+    }
+
+    private static Throwable getRootCause(Throwable t) {
+        if (t instanceof ExecutionException || t instanceof 
CompletionException) {
+            return t.getCause() != null ? t.getCause() : t;
+        }
+        return t;
+    }
+
+    /**
+     * Kills the ZK session of a CoordinatorServer to simulate real session 
timeout.
+     *
+     * <p>This creates a duplicate ZK connection with the same session ID and 
immediately closes it,
+     * which forces the ZK server to expire the original session. This causes:
+     *
+     * <ol>
+     *   <li>All ephemeral nodes for that session to be deleted (LeaderLatch 
node, leader address)
+     *   <li>The original client receives SESSION_EXPIRED event
+     *   <li>Curator fires ConnectionState.LOST
+     *   <li>LeaderLatch calls notLeader() callback
+     *   <li>Curator reconnects with a new session and re-participates in 
election
+     * </ol>
+     */
+    private void killZkSession(CoordinatorServer server) throws Exception {
+        CuratorFramework curatorClient = 
server.getZooKeeperClient().getCuratorClient();
+        ZooKeeper zk = curatorClient.getZookeeperClient().getZooKeeper();
+        long sessionId = zk.getSessionId();
+        byte[] sessionPasswd = zk.getSessionPasswd();
+        String connectString = 
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString();
+
+        // We must wait for the duplicate connection to be fully established 
before
+        // closing it. Otherwise, close() may happen before the TCP connection 
reaches
+        // the ZK server, meaning the server never sees the duplicate session 
and the
+        // original session stays alive (making the test flaky).
+        CountDownLatch connectedLatch = new CountDownLatch(1);
+        ZooKeeper dupZk =
+                new ZooKeeper(
+                        connectString,
+                        1000,
+                        event -> {
+                            if (event.getState() == 
Watcher.Event.KeeperState.SyncConnected) {
+                                connectedLatch.countDown();
+                            }
+                        },
+                        sessionId,
+                        sessionPasswd);
+        if (!connectedLatch.await(10, TimeUnit.SECONDS)) {
+            dupZk.close();
+            throw new RuntimeException(
+                    "Failed to establish duplicate ZK connection for session 
kill");
+        }
+        dupZk.close();
+    }
+
+    /** Waits until a new coordinator leader is elected that is different from 
the given old one. */
+    private void waitUntilNewLeaderElected(String oldLeaderId) {
+        waitUntil(
+                () -> {
+                    try {
+                        return zookeeperClient
+                                .getCoordinatorLeaderAddress()
+                                .map(addr -> !addr.getId().equals(oldLeaderId))
+                                .orElse(false);
+                    } catch (Exception e) {
+                        return false;
+                    }
+                },
+                Duration.ofMinutes(1),
+                "Fail to wait for new coordinator leader to be elected");
+    }
+
+    /** Verifies that the given server can process requests as a leader. */
+    private void verifyServerIsLeader(CoordinatorServer server, String dbName) 
throws Exception {
+        createGatewayForServer(server)
+                .createDatabase(
+                        new 
CreateDatabaseRequest().setDatabaseName(dbName).setIgnoreIfExists(true))
+                .get();
+    }
+
+    /** Verifies that the given server rejects requests as a standby 
(non-leader). */
+    private void verifyServerIsStandby(CoordinatorServer server) {
+        assertThatThrownBy(
+                        () ->
+                                createGatewayForServer(server)
+                                        .createDatabase(
+                                                new CreateDatabaseRequest()
+                                                        .setDatabaseName(
+                                                                
"standby_check_"
+                                                                        + 
System.nanoTime())
+                                                        
.setIgnoreIfExists(false))
+                                        .get())
+                .satisfies(
+                        t ->
+                                assertThat(getRootCause(t))
+                                        
.isInstanceOf(NotCoordinatorLeaderException.class));
+    }
+
+    /**
+     * Waits until a server is registered in ZK (its ephemeral node at 
/coordinators/ids/[serverId]
+     * exists). This ensures the server has reconnected after a session loss 
and re-joined the
+     * election, so it can become leader again when needed.
+     */
+    private void waitUntilServerRegistered(CoordinatorServer server) {
+        String path = "/coordinators/ids/" + server.getServerId();
+        waitUntil(
+                () -> {
+                    try {
+                        return 
zookeeperClient.getCuratorClient().checkExists().forPath(path)
+                                != null;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                },
+                Duration.ofSeconds(30),
+                "Server " + server.getServerId() + " did not re-register in 
ZK");
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java
new file mode 100644
index 000000000..ad89f57ad
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.CoordinatorAddress;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class CoordinatorServerElectionTest {
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    protected static ZooKeeperClient zookeeperClient;
+
+    @BeforeAll
+    static void baseBeforeAll() {
+        zookeeperClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @Test
+    void testCoordinatorServerElection() throws Exception {
+        CoordinatorServer coordinatorServer1 = new 
CoordinatorServer(createConfiguration());
+        CoordinatorServer coordinatorServer2 = new 
CoordinatorServer(createConfiguration());
+        CoordinatorServer coordinatorServer3 = new 
CoordinatorServer(createConfiguration());
+
+        List<CoordinatorServer> coordinatorServerList =
+                Arrays.asList(coordinatorServer1, coordinatorServer2, 
coordinatorServer3);
+
+        // start 3 coordinator servers
+        for (int i = 0; i < 3; i++) {
+            CoordinatorServer server = coordinatorServerList.get(i);
+            server.start();
+        }
+
+        // random coordinator become leader
+        waitUntilCoordinatorServerElected();
+
+        CoordinatorAddress firstLeaderAddress = 
zookeeperClient.getCoordinatorLeaderAddress().get();
+
+        // Find the leader and try to restart it.
+        CoordinatorServer firstLeader = null;
+        for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+            if (Objects.equals(coordinatorServer.getServerId(), 
firstLeaderAddress.getId())) {
+                firstLeader = coordinatorServer;
+                break;
+            }
+        }
+        assertThat(firstLeader).isNotNull();
+        firstLeader.close();
+        firstLeader.start();
+
+        // Then we should get another Coordinator server leader elected
+        waitUntilCoordinatorServerReelected(firstLeaderAddress);
+        CoordinatorAddress secondLeaderAddress =
+                zookeeperClient.getCoordinatorLeaderAddress().get();
+        assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
+
+        CoordinatorServer secondLeader = null;
+        for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+            if (Objects.equals(coordinatorServer.getServerId(), 
secondLeaderAddress.getId())) {
+                secondLeader = coordinatorServer;
+                break;
+            }
+        }
+        CoordinatorServer nonLeader = null;
+        for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+            if (!Objects.equals(coordinatorServer.getServerId(), 
firstLeaderAddress.getId())
+                    && !Objects.equals(
+                            coordinatorServer.getServerId(), 
secondLeaderAddress.getId())) {
+                nonLeader = coordinatorServer;
+                break;
+            }
+        }
+        // kill other 2 coordinator servers except the first one
+        nonLeader.close();
+        secondLeader.close();
+
+        // the origin coordinator server should become leader again
+        waitUntilCoordinatorServerReelected(secondLeaderAddress);
+        CoordinatorAddress thirdLeaderAddress = 
zookeeperClient.getCoordinatorLeaderAddress().get();
+
+        
assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId());
+    }
+
+    /** Create a configuration with Zookeeper address setting. */
+    protected static Configuration createConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                ConfigOptions.ZOOKEEPER_ADDRESS,
+                
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString());
+        configuration.setString(
+                ConfigOptions.BIND_LISTENERS, 
"CLIENT://localhost:0,FLUSS://localhost:0");
+        configuration.setString(ConfigOptions.ADVERTISED_LISTENERS, 
"CLIENT://198.168.0.1:100");
+        configuration.set(ConfigOptions.REMOTE_DATA_DIR, 
"/tmp/fluss/remote-data");
+
+        return configuration;
+    }
+
+    public void waitUntilCoordinatorServerElected() {
+        waitUntil(
+                () -> 
zookeeperClient.getCoordinatorLeaderAddress().isPresent(),
+                Duration.ofMinutes(1),
+                "Fail to wait coordinator server elected");
+    }
+
+    public void waitUntilCoordinatorServerReelected(CoordinatorAddress 
originAddress) {
+        waitUntil(
+                () ->
+                        
zookeeperClient.getCoordinatorLeaderAddress().isPresent()
+                                && !zookeeperClient
+                                        .getCoordinatorLeaderAddress()
+                                        .get()
+                                        .equals(originAddress),
+                Duration.ofMinutes(1),
+                "Fail to wait coordinator server reelected");
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java
index bd060d6ee..ae39bd185 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java
@@ -63,6 +63,7 @@ class CoordinatorServerITCase extends ServerITCaseBase {
                 ConfigOptions.BIND_LISTENERS,
                 String.format("%s://%s:%d", DEFAULT_LISTENER_NAME, HOSTNAME, 
getPort()));
         conf.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
+
         return conf;
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java
index aee7f0d47..ecdc4e3dc 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java
@@ -26,8 +26,10 @@ import org.apache.fluss.server.zk.data.CoordinatorAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
+import java.time.Duration;
 import java.util.Optional;
 
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CoordinatorServer} . */
@@ -38,6 +40,7 @@ class CoordinatorServerTest extends ServerTestBase {
     @BeforeEach
     void beforeEach() throws Exception {
         coordinatorServer = startCoordinatorServer(createConfiguration());
+        waitUntilCoordinatorServerElected();
     }
 
     @AfterEach
@@ -55,7 +58,11 @@ class CoordinatorServerTest extends ServerTestBase {
     @Override
     protected ServerBase getStartFailServer() {
         Configuration configuration = createConfiguration();
-        configuration.set(ConfigOptions.BIND_LISTENERS, 
"CLIENT://localhost:-12");
+        // CoordinatorServer starts leader services asynchronously in a 
separate election
+        // thread. An invalid port wouldn't cause start() to throw because the 
port binding
+        // happens asynchronously. Instead, use an empty ZK address to cause a 
synchronous
+        // failure in startZookeeperClient() during start().
+        configuration.setString(ConfigOptions.ZOOKEEPER_ADDRESS, "");
         return new CoordinatorServer(configuration);
     }
 
@@ -63,10 +70,18 @@ class CoordinatorServerTest extends ServerTestBase {
     protected void checkAfterStartServer() throws Exception {
         assertThat(coordinatorServer.getRpcServer()).isNotNull();
         // check the data put in zk after coordinator server start
-        Optional<CoordinatorAddress> optCoordinatorAddr = 
zookeeperClient.getCoordinatorAddress();
+        Optional<CoordinatorAddress> optCoordinatorAddr =
+                zookeeperClient.getCoordinatorLeaderAddress();
         assertThat(optCoordinatorAddr).isNotEmpty();
         verifyEndpoint(
                 optCoordinatorAddr.get().getEndpoints(),
                 coordinatorServer.getRpcServer().getBindEndpoints());
     }
+
+    public void waitUntilCoordinatorServerElected() {
+        waitUntil(
+                () -> 
zookeeperClient.getCoordinatorLeaderAddress().isPresent(),
+                Duration.ofSeconds(5),
+                "Fail to wait coordinator server elected");
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcherTest.java
similarity index 64%
copy from 
fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java
copy to 
fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcherTest.java
index d7739028d..a9f181915 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcherTest.java
@@ -17,17 +17,14 @@
 
 package org.apache.fluss.server.coordinator.event.watcher;
 
-import org.apache.fluss.cluster.Endpoint;
-import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
-import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent;
-import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
 import org.apache.fluss.server.coordinator.event.TestingEventManager;
-import org.apache.fluss.server.metadata.ServerInfo;
 import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperExtension;
-import org.apache.fluss.server.zk.data.TabletServerRegistration;
+import org.apache.fluss.server.zk.data.CoordinatorAddress;
 import org.apache.fluss.testutils.common.AllCallbackWrapper;
 
 import org.junit.jupiter.api.Test;
@@ -35,46 +32,36 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link TabletServerChangeWatcher} . */
-class TabletServerChangeWatcherTest {
+/** Test for {@link CoordinatorChangeWatcher} . */
+class CoordinatorChangeWatcherTest {
 
     @RegisterExtension
     public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
             new AllCallbackWrapper<>(new ZooKeeperExtension());
 
     @Test
-    void testServetChanges() throws Exception {
+    void testServerChanges() throws Exception {
         ZooKeeperClient zookeeperClient =
                 ZOO_KEEPER_EXTENSION_WRAPPER
                         .getCustomExtension()
                         .getZooKeeperClient(NOPErrorHandler.INSTANCE);
         TestingEventManager eventManager = new TestingEventManager();
-        TabletServerChangeWatcher tabletServerChangeWatcher =
-                new TabletServerChangeWatcher(zookeeperClient, eventManager);
-        tabletServerChangeWatcher.start();
+        CoordinatorChangeWatcher coordinatorChangeWatcher =
+                new CoordinatorChangeWatcher(zookeeperClient, eventManager);
+        coordinatorChangeWatcher.start();
 
         // register new servers
         List<CoordinatorEvent> expectedEvents = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            TabletServerRegistration tabletServerRegistration =
-                    new TabletServerRegistration(
-                            "rack" + i,
-                            Collections.singletonList(new Endpoint("host" + i, 
1234, "CLIENT")),
-                            System.currentTimeMillis());
-            expectedEvents.add(
-                    new NewTabletServerEvent(
-                            new ServerInfo(
-                                    i,
-                                    tabletServerRegistration.getRack(),
-                                    tabletServerRegistration.getEndpoints(),
-                                    ServerType.TABLET_SERVER)));
-            zookeeperClient.registerTabletServer(i, tabletServerRegistration);
+            expectedEvents.add(new NewCoordinatorEvent(String.valueOf(i)));
+            CoordinatorAddress coordinatorAddress =
+                    new CoordinatorAddress(String.valueOf(i), new 
ArrayList<>());
+            zookeeperClient.registerCoordinatorServer(coordinatorAddress);
         }
 
         retry(
@@ -88,7 +75,7 @@ class TabletServerChangeWatcherTest {
 
         // unregister servers
         for (int i = 0; i < 10; i++) {
-            expectedEvents.add(new DeadTabletServerEvent(i));
+            expectedEvents.add(new DeadCoordinatorEvent(String.valueOf(i)));
         }
 
         retry(
@@ -97,6 +84,6 @@ class TabletServerChangeWatcherTest {
                         assertThat(eventManager.getEvents())
                                 
.containsExactlyInAnyOrderElementsOf(expectedEvents));
 
-        tabletServerChangeWatcher.stop();
+        coordinatorChangeWatcher.stop();
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java
index d7739028d..6da8dfacd 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java
@@ -49,7 +49,7 @@ class TabletServerChangeWatcherTest {
             new AllCallbackWrapper<>(new ZooKeeperExtension());
 
     @Test
-    void testServetChanges() throws Exception {
+    void testServerChanges() throws Exception {
         ZooKeeperClient zookeeperClient =
                 ZOO_KEEPER_EXTENSION_WRAPPER
                         .getCustomExtension()
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index baabd58cb..8ce9f6a7c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -266,6 +266,7 @@ public final class FlussClusterExtension
             setRemoteDataDir(conf);
             coordinatorServer = new CoordinatorServer(conf, clock);
             coordinatorServer.start();
+            waitUntilCoordinatorServerElected();
             coordinatorServerInfo =
                     // TODO, Currently, we use 0 as coordinator server id.
                     new ServerInfo(
@@ -276,6 +277,7 @@ public final class FlussClusterExtension
         } else {
             // start the existing coordinator server
             coordinatorServer.start();
+            waitUntilCoordinatorServerElected();
             coordinatorServerInfo =
                     new ServerInfo(
                             0,
@@ -941,6 +943,15 @@ public final class FlussClusterExtension
         return coordinatorServer;
     }
 
+    public void waitUntilCoordinatorServerElected() throws Exception {
+        coordinatorServer.getLeaderElectionFuture().get();
+
+        waitUntil(
+                () -> 
zooKeeperClient.getCoordinatorLeaderAddress().isPresent(),
+                Duration.ofSeconds(10),
+                "Fail to wait coordinator server elected");
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     /** Builder for {@link FlussClusterExtension}. */
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
index ac8bf705a..c867b8dce 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
@@ -109,14 +109,14 @@ class ZooKeeperClientTest {
     void testCoordinatorLeader() throws Exception {
         // try to get leader address, should return empty since node leader 
address stored in
         // zk
-        assertThat(zookeeperClient.getCoordinatorAddress()).isEmpty();
+        assertThat(zookeeperClient.getCoordinatorLeaderAddress()).isEmpty();
         CoordinatorAddress coordinatorAddress =
                 new CoordinatorAddress(
                         "2", 
Endpoint.fromListenersString("CLIENT://localhost1:10012"));
         // register leader address
         zookeeperClient.registerCoordinatorLeader(coordinatorAddress);
         // check get leader address
-        CoordinatorAddress gottenAddress = 
zookeeperClient.getCoordinatorAddress().get();
+        CoordinatorAddress gottenAddress = 
zookeeperClient.getCoordinatorLeaderAddress().get();
         assertThat(gottenAddress).isEqualTo(coordinatorAddress);
     }
 
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index 97240938b..5e9e668be 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -294,10 +294,15 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-       <th rowspan="24"><strong>coordinator</strong></th>
-      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="9">-</td>
+       <th rowspan="25"><strong>coordinator</strong></th>
+      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="10">-</td>
       <td>activeCoordinatorCount</td>
-      <td>The number of active CoordinatorServer in this cluster.</td>
+      <td>The number of active CoordinatorServer (only leader) in this 
cluster.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>aliveCoordinatorCount</td>
+      <td>The number of alive (including leader and standby) CoordinatorServer 
in this cluster.</td>
       <td>Gauge</td>
     </tr>
     <tr>

Reply via email to