wuchong commented on code in PR #2780: URL: https://github.com/apache/fluss/pull/2780#discussion_r2987079487
########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */ +public class CoordinatorLeaderElection implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class); + + private final String serverId; + private final LeaderLatch leaderLatch; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>(); + private volatile Thread electionThread; + + public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { + this.serverId = serverId; + this.leaderLatch = + new LeaderLatch( + zkClient.getCuratorClient(), + ZkData.CoordinatorElectionZNode.path(), + String.valueOf(serverId)); + } + + /** + * Starts the leader election process asynchronously. The returned future completes when this + * server becomes the leader and initializes the leader services. + * + * @param initLeaderServices the runnable to initialize leader services once elected + * @return a CompletableFuture that completes when this server becomes leader + */ + public CompletableFuture<Void> startElectLeaderAsync(Runnable initLeaderServices) { + leaderLatch.addListener( + new LeaderLatchListener() { + @Override + public void isLeader() { + LOG.info("Coordinator server {} has become the leader.", serverId); + isLeader.set(true); + } + + @Override + public void notLeader() { + relinquishLeadership(); + LOG.warn("Coordinator server {} has lost the leadership.", serverId); + } + }); + + try { + leaderLatch.start(); + LOG.info("Coordinator server {} started leader election.", serverId); + } catch (Exception e) { + LOG.error("Failed to start LeaderLatch for server {}", serverId, e); + leaderReadyFuture.completeExceptionally( + new RuntimeException("Leader election start failed", e)); + return leaderReadyFuture; + } + + // Run the await and initialization in a separate thread to avoid blocking + electionThread = + new Thread( + () -> { + try { + // todo: Currently, we await the leader latch and do nothing until + // it becomes leader. + // Later we can make it as a hot backup server to continuously + // synchronize metadata from + // Zookeeper, which save time from recovering context + leaderLatch.await(); + doInitLeaderServices(initLeaderServices); + leaderReadyFuture.complete(null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info( + "Leader election for server {} was interrupted.", serverId); + leaderReadyFuture.completeExceptionally(e); + } catch (Exception e) { + LOG.error( + "Failed during leader election for server {}", serverId, e); + leaderReadyFuture.completeExceptionally(e); + } + }, + "coordinator-leader-election-" + serverId); + electionThread.start(); + + return leaderReadyFuture; + } + + private void doInitLeaderServices(Runnable initLeaderServices) { + initLeaderServices.run(); + } + + @Override + public void close() { + LOG.info("Closing LeaderLatch for server {}.", serverId); + + // Interrupt the election thread if it's waiting + if (electionThread != null && electionThread.isAlive()) { + electionThread.interrupt(); + } + + if (leaderLatch != null) { + try { + leaderLatch.close(); + } catch (Exception e) { + LOG.error("Failed to close LeaderLatch for server {}.", serverId, e); + } + } + + // Complete the future exceptionally if it hasn't been completed yet + leaderReadyFuture.completeExceptionally( + new RuntimeException("Leader election closed for server " + serverId)); + } + + public boolean isLeader() { + return this.isLeader.get(); + } + + private void relinquishLeadership() { + isLeader.set(false); + LOG.info("Coordinator server {} has been fenced.", serverId); + + this.close(); Review Comment: We shouldn't close the election, rather than the coordinator should re-enter election state. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */ +public class CoordinatorLeaderElection implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class); + + private final String serverId; + private final LeaderLatch leaderLatch; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>(); + private volatile Thread electionThread; + + public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { + this.serverId = serverId; + this.leaderLatch = + new LeaderLatch( + zkClient.getCuratorClient(), + ZkData.CoordinatorElectionZNode.path(), + String.valueOf(serverId)); + } + + /** + * Starts the leader election process asynchronously. The returned future completes when this + * server becomes the leader and initializes the leader services. + * + * @param initLeaderServices the runnable to initialize leader services once elected + * @return a CompletableFuture that completes when this server becomes leader + */ + public CompletableFuture<Void> startElectLeaderAsync(Runnable initLeaderServices) { + leaderLatch.addListener( + new LeaderLatchListener() { + @Override + public void isLeader() { + LOG.info("Coordinator server {} has become the leader.", serverId); + isLeader.set(true); + } + + @Override + public void notLeader() { + relinquishLeadership(); + LOG.warn("Coordinator server {} has lost the leadership.", serverId); + } + }); + + try { + leaderLatch.start(); + LOG.info("Coordinator server {} started leader election.", serverId); + } catch (Exception e) { + LOG.error("Failed to start LeaderLatch for server {}", serverId, e); + leaderReadyFuture.completeExceptionally( + new RuntimeException("Leader election start failed", e)); + return leaderReadyFuture; + } + + // Run the await and initialization in a separate thread to avoid blocking + electionThread = + new Thread( + () -> { + try { + // todo: Currently, we await the leader latch and do nothing until + // it becomes leader. + // Later we can make it as a hot backup server to continuously + // synchronize metadata from + // Zookeeper, which save time from recovering context + leaderLatch.await(); + doInitLeaderServices(initLeaderServices); Review Comment: This additional `electionThread` appears redundant. We can invoke `doInitLeaderServices(initLeaderServices)` directly within the `LeaderLatchListener#isLeader` callback, thereby eliminating the overhead of an extra thread. Furthermore, the current logic fails to address how the coordinator should re-enter the leader election state after transitioning from a leader to a follower, such as during network fluctuations or full GC events. By removing the dedicated thread and consolidating the logic within the `LeaderLatchListener`, the implementation becomes significantly simpler. Specifically, we can incorporate the logic to resume leader contention directly in the `notLeader` method, removing the need to manually start and manage a separate `electionThread`. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */ +public class CoordinatorLeaderElection implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class); + + private final String serverId; + private final LeaderLatch leaderLatch; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>(); + private volatile Thread electionThread; + + public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { + this.serverId = serverId; + this.leaderLatch = + new LeaderLatch( + zkClient.getCuratorClient(), + ZkData.CoordinatorElectionZNode.path(), + String.valueOf(serverId)); + } + + /** + * Starts the leader election process asynchronously. The returned future completes when this + * server becomes the leader and initializes the leader services. + * + * @param initLeaderServices the runnable to initialize leader services once elected + * @return a CompletableFuture that completes when this server becomes leader + */ + public CompletableFuture<Void> startElectLeaderAsync(Runnable initLeaderServices) { + leaderLatch.addListener( + new LeaderLatchListener() { + @Override + public void isLeader() { + LOG.info("Coordinator server {} has become the leader.", serverId); + isLeader.set(true); Review Comment: The `isLeader` is never used which I think can be removed. ########## fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.CoordinatorAddress; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; + +class CoordinatorServerElectionTest { Review Comment: We should add one more test for coordinator becomes from leader to follower, and it can still elect for leader. ########## website/docs/maintenance/observability/monitor-metrics.md: ########## @@ -294,12 +294,17 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM </thead> <tbody> <tr> - <th rowspan="24"><strong>coordinator</strong></th> - <td style={{textAlign: 'center', verticalAlign: 'middle' }} rowspan="10">-</td> + <th rowspan="25"><strong>coordinator</strong></th> + <td style={{textAlign: 'center', verticalAlign: 'middle' }} rowspan="11">-</td> <td>activeCoordinatorCount</td> <td>The number of active CoordinatorServer in this cluster.</td> Review Comment: nit: ```suggestion <td>The number of active CoordinatorServer (only leader) in this cluster.</td> ``` ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java: ########## @@ -170,10 +176,38 @@ public static void main(String[] args) { @Override protected void startServices() throws Exception { + electCoordinatorLeaderAsync(); + } + + private CompletableFuture<Void> electCoordinatorLeaderAsync() throws Exception { + this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); + + // Coordinator Server supports high availability. If 3 coordinator servers are alive, + // one of them will be elected as leader and the other two will be standby. + // When leader fails, one of standby coordinators will be elected as new leader. + registerCoordinatorServer(); + ZooKeeperUtils.registerZookeeperClientReInitSessionListener( + zkClient, this::registerCoordinatorServer, this); + + // standby + this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId); + this.leaderElectionFuture = + coordinatorLeaderElection.startElectLeaderAsync( + () -> { + try { + startCoordinatorLeaderService(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return leaderElectionFuture; + } + + protected void startCoordinatorLeaderService() throws Exception { Review Comment: nit: `startLeaderServices` would be simpler and clearer, and we can introduce `stopLeaderServices` for the inverse operation. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java: ########## @@ -170,10 +176,38 @@ public static void main(String[] args) { @Override protected void startServices() throws Exception { + electCoordinatorLeaderAsync(); + } + + private CompletableFuture<Void> electCoordinatorLeaderAsync() throws Exception { + this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); + + // Coordinator Server supports high availability. If 3 coordinator servers are alive, + // one of them will be elected as leader and the other two will be standby. + // When leader fails, one of standby coordinators will be elected as new leader. + registerCoordinatorServer(); + ZooKeeperUtils.registerZookeeperClientReInitSessionListener( + zkClient, this::registerCoordinatorServer, this); + + // standby + this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId); + this.leaderElectionFuture = + coordinatorLeaderElection.startElectLeaderAsync( + () -> { + try { + startCoordinatorLeaderService(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return leaderElectionFuture; Review Comment: The retrun value is never used. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent; +import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A watcher to watch the coordinator server changes(new/delete) in zookeeper. */ +public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorChangeWatcher.class); + + public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient, EventManager eventManager) { + super(zooKeeperClient, eventManager, ZkData.CoordinatorIdsZNode.path()); + } + + @Override + protected CuratorCacheListener createListener() { + return new CoordinatorChangeListener(); + } + + @Override + protected String getWatcherName() { + return "CoordinatorChangeWatcher"; + } + + protected String getServerIdFromEvent(ChildData data) { + try { + return ZKPaths.getNodeFromPath(data.getPath()); + } catch (NumberFormatException e) { + throw new FlussRuntimeException( + "Invalid server id in zookeeper path: " + data.getPath(), e); + } + } + + private final class CoordinatorChangeListener implements CuratorCacheListener { + + @Override + public void event(Type type, ChildData oldData, ChildData newData) { + if (newData != null) { + LOG.debug("Received {} event (path: {})", type, newData.getPath()); + } else { + LOG.debug("Received {} event", type); + } + + switch (type) { + case NODE_CREATED: + { + if (newData != null && newData.getData().length > 0) { + String serverId = getServerIdFromEvent(newData); + LOG.info("Received CHILD_ADDED event for server {}.", serverId); + eventManager.put(new NewCoordinatorEvent(serverId)); + } + break; + } + case NODE_DELETED: + { + if (oldData != null && oldData.getData().length > 0) { Review Comment: [P3] `registerCoordinatorServer()` creates `/coordinators/ids/{serverId}` with no payload, so these `length > 0` guards are false for both create and delete notifications. As a result `NewCoordinatorEvent` and `DeadCoordinatorEvent` are never emitted, and the new `aliveCoordinatorCount` gauge never reflects coordinators joining or leaving after startup. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent; +import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A watcher to watch the coordinator server changes(new/delete) in zookeeper. */ +public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorChangeWatcher.class); + + public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient, EventManager eventManager) { + super(zooKeeperClient, eventManager, ZkData.CoordinatorIdsZNode.path()); + } + + @Override + protected CuratorCacheListener createListener() { + return new CoordinatorChangeListener(); + } + + @Override + protected String getWatcherName() { + return "CoordinatorChangeWatcher"; + } + + protected String getServerIdFromEvent(ChildData data) { + try { + return ZKPaths.getNodeFromPath(data.getPath()); + } catch (NumberFormatException e) { + throw new FlussRuntimeException( + "Invalid server id in zookeeper path: " + data.getPath(), e); + } Review Comment: +1, we should try catch general `Exception` or remove the retry catch here. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */ +public class CoordinatorLeaderElection implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class); + + private final String serverId; + private final LeaderLatch leaderLatch; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>(); + private volatile Thread electionThread; + + public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { + this.serverId = serverId; + this.leaderLatch = + new LeaderLatch( + zkClient.getCuratorClient(), + ZkData.CoordinatorElectionZNode.path(), + String.valueOf(serverId)); + } + + /** + * Starts the leader election process asynchronously. The returned future completes when this + * server becomes the leader and initializes the leader services. + * + * @param initLeaderServices the runnable to initialize leader services once elected + * @return a CompletableFuture that completes when this server becomes leader + */ + public CompletableFuture<Void> startElectLeaderAsync(Runnable initLeaderServices) { + leaderLatch.addListener( + new LeaderLatchListener() { + @Override + public void isLeader() { + LOG.info("Coordinator server {} has become the leader.", serverId); + isLeader.set(true); + } + + @Override + public void notLeader() { + relinquishLeadership(); Review Comment: +1. We must ensure the coordinator re-enters the leader election state after transitioning from a leader to a follower. To achieve this, we should stop all leader services by executing the inverse operations of `startCoordinatorLeaderService`. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java: ########## @@ -178,20 +177,35 @@ public Optional<byte[]> getOrEmpty(String path) throws Exception { // Coordinator server // -------------------------------------------------------------------------------------------- - /** Register a coordinator leader server to ZK. */ + /** Register a coordinator server to ZK. */ + public void registerCoordinatorServer(String coordinatorId) throws Exception { + String path = ZkData.CoordinatorIdZNode.path(coordinatorId); + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); Review Comment: `registerCoordinatorServer()` creates `/coordinators/ids/{serverId}` with no payload, so these length > 0 guards are false for both create and delete notifications. As a result NewCoordinatorEvent and DeadCoordinatorEvent are never emitted, and the new aliveCoordinatorCount gauge never reflects coordinators joining or leaving after startup. I suggest to serialize the coordinator address in it, so it can have data in it and it would be helpful for debug. Otherwise, we don't know who is the server id (as the server id are randomly generated). ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java: ########## @@ -170,10 +176,38 @@ public static void main(String[] args) { @Override protected void startServices() throws Exception { + electCoordinatorLeaderAsync(); + } + + private CompletableFuture<Void> electCoordinatorLeaderAsync() throws Exception { + this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); + + // Coordinator Server supports high availability. If 3 coordinator servers are alive, + // one of them will be elected as leader and the other two will be standby. + // When leader fails, one of standby coordinators will be elected as new leader. + registerCoordinatorServer(); + ZooKeeperUtils.registerZookeeperClientReInitSessionListener( + zkClient, this::registerCoordinatorServer, this); + + // standby + this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId); + this.leaderElectionFuture = + coordinatorLeaderElection.startElectLeaderAsync( + () -> { + try { + startCoordinatorLeaderService(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); Review Comment: [P1] The future returned here is only stored; nothing in production chains it to `onFatalError()` or `terminationFuture`. If the election thread later fails (for example `LeaderLatch.start()` fails, or `startCoordinatorLeaderService()` hits a bad bind/configuration during RPC or metadata initialization), `CoordinatorServer.start()` has already succeeded and the process stays alive without a usable coordinator instead of failing fast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
