Copilot commented on code in PR #2780:
URL: https://github.com/apache/fluss/pull/2780#discussion_r2969309912


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -306,6 +334,41 @@ protected CompletableFuture<Result> closeAsync(Result 
result) {
         return terminationFuture;
     }
 
+    private void registerCoordinatorServer() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // we need to retry to register since although
+        // zkClient reconnect, the ephemeral node may still exist
+        // for a while time, retry to wait the ephemeral node removed
+        // see ZOOKEEPER-2985
+        while (true) {
+            try {
+                zkClient.registerCoordinatorServer(this.serverId);
+                break;
+            } catch (KeeperException.NodeExistsException nodeExistsException) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) {
+                    LOG.error(
+                            "Coordinator Server register to Zookeeper exceeded 
total retry time of {} ms. "
+                                    + "Aborting registration attempts.",
+                            ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS);
+                    throw nodeExistsException;
+                }
+
+                LOG.warn(
+                        "Coordinator server already registered in Zookeeper. "
+                                + "retrying register after {} ms....",
+                        ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+                try {
+                    Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+                } catch (InterruptedException interruptedException) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            }

Review Comment:
   In `registerCoordinatorServer()`, if the retry sleep is interrupted you 
`break` out of the loop, which can return without registering the coordinator 
server znode. That leaves the process in a partially-started state (it can 
still participate in election / run leader services) but won't show up in 
`getCoordinatorServerList()` and related metrics. Prefer propagating the 
interruption (rethrow `InterruptedException` or wrap it) so startup fails fast, 
or continue retrying with interruption-aware shutdown logic.



##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.CoordinatorAddress;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class CoordinatorServerElectionTest {
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    protected static ZooKeeperClient zookeeperClient;
+
+    @BeforeAll
+    static void baseBeforeAll() {
+        zookeeperClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @Test
+    void testCoordinatorServerElection() throws Exception {
+        CoordinatorServer coordinatorServer1 = new 
CoordinatorServer(createConfiguration());
+        CoordinatorServer coordinatorServer2 = new 
CoordinatorServer(createConfiguration());
+        CoordinatorServer coordinatorServer3 = new 
CoordinatorServer(createConfiguration());
+
+        List<CoordinatorServer> coordinatorServerList =
+                Arrays.asList(coordinatorServer1, coordinatorServer2, 
coordinatorServer3);
+
+        // start 3 coordinator servers
+        for (int i = 0; i < 3; i++) {
+            CoordinatorServer server = coordinatorServerList.get(i);
+            server.start();
+        }
+
+        // random coordinator become leader
+        waitUntilCoordinatorServerElected();
+
+        CoordinatorAddress firstLeaderAddress = 
zookeeperClient.getCoordinatorLeaderAddress().get();
+
+        // Find the leader and try to restart it.
+        CoordinatorServer firstLeader = null;
+        for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+            if (Objects.equals(coordinatorServer.getServerId(), 
firstLeaderAddress.getId())) {
+                firstLeader = coordinatorServer;
+                break;
+            }
+        }
+        assertThat(firstLeader).isNotNull();
+        firstLeader.close();
+        firstLeader.start();
+
+        // Then we should get another Coordinator server leader elected
+        waitUntilCoordinatorServerReelected(firstLeaderAddress);
+        CoordinatorAddress secondLeaderAddress =
+                zookeeperClient.getCoordinatorLeaderAddress().get();
+        assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
+
+        CoordinatorServer secondLeader = null;
+        for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+            if (Objects.equals(coordinatorServer.getServerId(), 
secondLeaderAddress.getId())) {
+                secondLeader = coordinatorServer;
+                break;
+            }
+        }
+        CoordinatorServer nonLeader = null;

Review Comment:
   This test closes and then restarts the same `CoordinatorServer` instance 
(`firstLeader.close(); firstLeader.start();`). `CoordinatorServer.closeAsync()` 
permanently flips `isShutDown` and completes `terminationFuture`, so the 
instance is not safely restartable (subsequent `close()` may become a no-op and 
resources can leak). To model leader failure/rejoin, create a new 
`CoordinatorServer` instance for the restarted node (or enhance 
`CoordinatorServer` to explicitly support restart semantics) and ensure all 
started servers are closed at the end of the test.
   ```suggestion
           try {
               // start 3 coordinator servers
               for (int i = 0; i < 3; i++) {
                   CoordinatorServer server = coordinatorServerList.get(i);
                   server.start();
               }
   
               // random coordinator become leader
               waitUntilCoordinatorServerElected();
   
               CoordinatorAddress firstLeaderAddress =
                       zookeeperClient.getCoordinatorLeaderAddress().get();
   
               // Find the leader and simulate its failure and rejoin with a 
new instance.
               CoordinatorServer firstLeader = null;
               int firstLeaderIndex = -1;
               for (int i = 0; i < coordinatorServerList.size(); i++) {
                   CoordinatorServer coordinatorServer = 
coordinatorServerList.get(i);
                   if (Objects.equals(coordinatorServer.getServerId(), 
firstLeaderAddress.getId())) {
                       firstLeader = coordinatorServer;
                       firstLeaderIndex = i;
                       break;
                   }
               }
               assertThat(firstLeader).isNotNull();
               firstLeader.close();
   
               // Create a new CoordinatorServer instance to model the node 
rejoining.
               CoordinatorServer restartedCoordinator =
                       new CoordinatorServer(createConfiguration());
               coordinatorServerList.set(firstLeaderIndex, 
restartedCoordinator);
               restartedCoordinator.start();
   
               // Then we should get another Coordinator server leader elected
               waitUntilCoordinatorServerReelected(firstLeaderAddress);
               CoordinatorAddress secondLeaderAddress =
                       zookeeperClient.getCoordinatorLeaderAddress().get();
               assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
   
               CoordinatorServer secondLeader = null;
               for (CoordinatorServer coordinatorServer : 
coordinatorServerList) {
                   if (Objects.equals(coordinatorServer.getServerId(), 
secondLeaderAddress.getId())) {
                       secondLeader = coordinatorServer;
                       break;
                   }
               }
               CoordinatorServer nonLeader = null;
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -305,13 +305,44 @@ public static String path() {
     // 
------------------------------------------------------------------------------------------
 
     /**
-     * The znode for the active coordinator. The znode path is:
+     * The znode for alive coordinators. The znode path is:
      *
-     * <p>/coordinators/active
+     * <p>/coordinators/ids
+     */
+    public static final class CoordinatorIdsZNode {
+        public static String path() {
+            return "/coordinators/ids";
+        }
+    }
+
+    /**
+     * The znode for a registered Coordinator information. The znode path is:
+     *
+     * <p>/coordinators/ids/[serverId]
+     */
+    public static final class CoordinatorIdZNode {
+        public static String path(String serverId) {
+            return CoordinatorIdsZNode.path() + "/" + serverId;
+        }
+    }
+
+    /**
+     * The znode for the coordinator leader election. The znode path is:
+     *
+     * <p>/coordinators/election
+     */
+    public static final class CoordinatorElectionZNode {
+        public static String path() {
+            return "/coordinators/election";
+        }
+    }
+
+    /**
+     * The znode for the active coordinator leader. The znode path is:
      *
-     * <p>Note: introduce standby coordinators in the future for znode 
"/coordinators/standby/".
+     * <p>/coordinators/leader
      */
-    public static final class CoordinatorZNode {
+    public static final class CoordinatorLeaderZNode {
         public static String path() {
             return "/coordinators/active";
         }

Review Comment:
   `CoordinatorLeaderZNode` JavaDoc says the znode path is 
`/coordinators/leader`, but `path()` currently returns `/coordinators/active`. 
This mismatch is likely to confuse operators and can lead to clients 
reading/writing different znodes if they follow the documentation. Please 
either update the JavaDoc to match the actual path (if keeping backward 
compatibility), or change `path()` to return `/coordinators/leader` (and handle 
migration/compat if needed).



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -170,10 +176,38 @@ public static void main(String[] args) {
 
     @Override
     protected void startServices() throws Exception {
+        electCoordinatorLeaderAsync();
+    }
+
+    private CompletableFuture<Void> electCoordinatorLeaderAsync() throws 
Exception {
+        this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
+
+        // Coordinator Server supports high availability. If 3 coordinator 
servers are alive,
+        // one of them will be elected as leader and the other two will be 
standby.
+        // When leader fails, one of standby coordinators will be elected as 
new leader.
+        registerCoordinatorServer();
+        ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
+                zkClient, this::registerCoordinatorServer, this);
+
+        // standby
+        this.coordinatorLeaderElection = new 
CoordinatorLeaderElection(zkClient, serverId);
+        this.leaderElectionFuture =
+                coordinatorLeaderElection.startElectLeaderAsync(
+                        () -> {
+                            try {
+                                startCoordinatorLeaderService();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        return leaderElectionFuture;
+    }

Review Comment:
   `startServices()` now returns immediately after starting leader election, so 
`CoordinatorServer.start()` can return before RPC/metrics/event-processor are 
initialized (previously they were started synchronously). This makes startup 
failures in `startCoordinatorLeaderService()` asynchronous and they won't be 
reported via `ServerBase.start()` (which only observes exceptions thrown from 
`startServices()`). Consider blocking in `startServices()` until either (a) 
this node becomes leader and leader services have started, or (b) leader 
election has definitively failed; alternatively, add explicit readiness/failure 
propagation so callers don't treat the server as started while it's still 
waiting/electing.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in 
zookeeper. */
+public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorChangeWatcher.class);
+
+    public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient, 
EventManager eventManager) {
+        super(zooKeeperClient, eventManager, 
ZkData.CoordinatorIdsZNode.path());
+    }
+
+    @Override
+    protected CuratorCacheListener createListener() {
+        return new CoordinatorChangeListener();
+    }
+
+    @Override
+    protected String getWatcherName() {
+        return "CoordinatorChangeWatcher";
+    }
+
+    protected String getServerIdFromEvent(ChildData data) {
+        try {
+            return ZKPaths.getNodeFromPath(data.getPath());
+        } catch (NumberFormatException e) {
+            throw new FlussRuntimeException(
+                    "Invalid server id in zookeeper path: " + data.getPath(), 
e);
+        }
+    }
+
+    private final class CoordinatorChangeListener implements 
CuratorCacheListener {
+
+        @Override
+        public void event(Type type, ChildData oldData, ChildData newData) {
+            if (newData != null) {
+                LOG.debug("Received {} event (path: {})", type, 
newData.getPath());
+            } else {
+                LOG.debug("Received {} event", type);
+            }
+
+            switch (type) {
+                case NODE_CREATED:
+                    {
+                        if (newData != null && newData.getData().length > 0) {
+                            String serverId = getServerIdFromEvent(newData);
+                            LOG.info("Received CHILD_ADDED event for server 
{}.", serverId);
+                            eventManager.put(new 
NewCoordinatorEvent(serverId));
+                        }
+                        break;
+                    }
+                case NODE_DELETED:
+                    {
+                        if (oldData != null && oldData.getData().length > 0) {
+                            String serverId = getServerIdFromEvent(oldData);
+                            LOG.info("Received CHILD_REMOVED event for server 
{}.", serverId);
+                            eventManager.put(new 
DeadCoordinatorEvent(serverId));
+                        }

Review Comment:
   `CoordinatorChangeWatcher` ignores create/delete events unless 
`ChildData.getData().length > 0`. But coordinator server registration nodes are 
created with no payload (`ZooKeeperClient.registerCoordinatorServer` uses 
`forPath(path)`), so `length` will be 0 (and `getData()` may even be null). 
This prevents `NewCoordinatorEvent`/`DeadCoordinatorEvent` from ever being 
emitted and can NPE on `getData()`. Consider removing the data-length gate (use 
the id from the path), or writing a non-empty payload when registering 
coordinator servers, and always null-check `getData()`.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in 
zookeeper. */
+public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorChangeWatcher.class);
+
+    public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient, 
EventManager eventManager) {
+        super(zooKeeperClient, eventManager, 
ZkData.CoordinatorIdsZNode.path());
+    }
+
+    @Override
+    protected CuratorCacheListener createListener() {
+        return new CoordinatorChangeListener();
+    }
+
+    @Override
+    protected String getWatcherName() {
+        return "CoordinatorChangeWatcher";
+    }
+
+    protected String getServerIdFromEvent(ChildData data) {
+        try {
+            return ZKPaths.getNodeFromPath(data.getPath());
+        } catch (NumberFormatException e) {
+            throw new FlussRuntimeException(
+                    "Invalid server id in zookeeper path: " + data.getPath(), 
e);
+        }

Review Comment:
   `getServerIdFromEvent()` catches `NumberFormatException`, but it doesn't 
parse the server id (it only calls `ZKPaths.getNodeFromPath`). This catch block 
appears to be leftover from the tablet-server watcher and makes the intent 
unclear. Consider removing the try/catch or validating the server id format 
explicitly if needed.
   ```suggestion
           return ZKPaths.getNodeFromPath(data.getPath());
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** Using by coordinator server. Coordinator servers listen ZK node and elect 
leadership. */
+public class CoordinatorLeaderElection implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorLeaderElection.class);
+
+    private final String serverId;
+    private final LeaderLatch leaderLatch;
+    private final AtomicBoolean isLeader = new AtomicBoolean(false);
+    private final CompletableFuture<Void> leaderReadyFuture = new 
CompletableFuture<>();
+    private volatile Thread electionThread;
+
+    public CoordinatorLeaderElection(ZooKeeperClient zkClient, String 
serverId) {
+        this.serverId = serverId;
+        this.leaderLatch =
+                new LeaderLatch(
+                        zkClient.getCuratorClient(),
+                        ZkData.CoordinatorElectionZNode.path(),
+                        String.valueOf(serverId));
+    }
+
+    /**
+     * Starts the leader election process asynchronously. The returned future 
completes when this
+     * server becomes the leader and initializes the leader services.
+     *
+     * @param initLeaderServices the runnable to initialize leader services 
once elected
+     * @return a CompletableFuture that completes when this server becomes 
leader
+     */
+    public CompletableFuture<Void> startElectLeaderAsync(Runnable 
initLeaderServices) {
+        leaderLatch.addListener(
+                new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        LOG.info("Coordinator server {} has become the 
leader.", serverId);
+                        isLeader.set(true);
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        relinquishLeadership();

Review Comment:
   `CoordinatorLeaderElection.notLeader()` calls `relinquishLeadership()`, 
which closes the `LeaderLatch` and completes the election future exceptionally. 
This effectively removes the server from future elections on transient 
disconnects or leadership loss, and for standby nodes it can prevent them from 
ever becoming leader later. Instead of closing the latch on `notLeader`, 
consider keeping the latch running and only stopping/cleaning up leader-only 
services when leadership is lost; if the process should exit on fencing, 
explicitly propagate that to `CoordinatorServer` so it shuts down.
   ```suggestion
                           isLeader.set(false);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to