Copilot commented on code in PR #2780:
URL: https://github.com/apache/fluss/pull/2780#discussion_r2969309912
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -306,6 +334,41 @@ protected CompletableFuture<Result> closeAsync(Result
result) {
return terminationFuture;
}
+ private void registerCoordinatorServer() throws Exception {
+ long startTime = System.currentTimeMillis();
+
+ // we need to retry to register since although
+ // zkClient reconnect, the ephemeral node may still exist
+ // for a while time, retry to wait the ephemeral node removed
+ // see ZOOKEEPER-2985
+ while (true) {
+ try {
+ zkClient.registerCoordinatorServer(this.serverId);
+ break;
+ } catch (KeeperException.NodeExistsException nodeExistsException) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) {
+ LOG.error(
+ "Coordinator Server register to Zookeeper exceeded
total retry time of {} ms. "
+ + "Aborting registration attempts.",
+ ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS);
+ throw nodeExistsException;
+ }
+
+ LOG.warn(
+ "Coordinator server already registered in Zookeeper. "
+ + "retrying register after {} ms....",
+ ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+ try {
+ Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
Review Comment:
In `registerCoordinatorServer()`, if the retry sleep is interrupted you
`break` out of the loop, which can return without registering the coordinator
server znode. That leaves the process in a partially-started state (it can
still participate in election / run leader services) but won't show up in
`getCoordinatorServerList()` and related metrics. Prefer propagating the
interruption (rethrow `InterruptedException` or wrap it) so startup fails fast,
or continue retrying with interruption-aware shutdown logic.
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.CoordinatorAddress;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class CoordinatorServerElectionTest {
+ @RegisterExtension
+ public static final AllCallbackWrapper<ZooKeeperExtension>
ZOO_KEEPER_EXTENSION_WRAPPER =
+ new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+ protected static ZooKeeperClient zookeeperClient;
+
+ @BeforeAll
+ static void baseBeforeAll() {
+ zookeeperClient =
+ ZOO_KEEPER_EXTENSION_WRAPPER
+ .getCustomExtension()
+ .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+ }
+
+ @Test
+ void testCoordinatorServerElection() throws Exception {
+ CoordinatorServer coordinatorServer1 = new
CoordinatorServer(createConfiguration());
+ CoordinatorServer coordinatorServer2 = new
CoordinatorServer(createConfiguration());
+ CoordinatorServer coordinatorServer3 = new
CoordinatorServer(createConfiguration());
+
+ List<CoordinatorServer> coordinatorServerList =
+ Arrays.asList(coordinatorServer1, coordinatorServer2,
coordinatorServer3);
+
+ // start 3 coordinator servers
+ for (int i = 0; i < 3; i++) {
+ CoordinatorServer server = coordinatorServerList.get(i);
+ server.start();
+ }
+
+ // random coordinator become leader
+ waitUntilCoordinatorServerElected();
+
+ CoordinatorAddress firstLeaderAddress =
zookeeperClient.getCoordinatorLeaderAddress().get();
+
+ // Find the leader and try to restart it.
+ CoordinatorServer firstLeader = null;
+ for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+ if (Objects.equals(coordinatorServer.getServerId(),
firstLeaderAddress.getId())) {
+ firstLeader = coordinatorServer;
+ break;
+ }
+ }
+ assertThat(firstLeader).isNotNull();
+ firstLeader.close();
+ firstLeader.start();
+
+ // Then we should get another Coordinator server leader elected
+ waitUntilCoordinatorServerReelected(firstLeaderAddress);
+ CoordinatorAddress secondLeaderAddress =
+ zookeeperClient.getCoordinatorLeaderAddress().get();
+ assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
+
+ CoordinatorServer secondLeader = null;
+ for (CoordinatorServer coordinatorServer : coordinatorServerList) {
+ if (Objects.equals(coordinatorServer.getServerId(),
secondLeaderAddress.getId())) {
+ secondLeader = coordinatorServer;
+ break;
+ }
+ }
+ CoordinatorServer nonLeader = null;
Review Comment:
This test closes and then restarts the same `CoordinatorServer` instance
(`firstLeader.close(); firstLeader.start();`). `CoordinatorServer.closeAsync()`
permanently flips `isShutDown` and completes `terminationFuture`, so the
instance is not safely restartable (subsequent `close()` may become a no-op and
resources can leak). To model leader failure/rejoin, create a new
`CoordinatorServer` instance for the restarted node (or enhance
`CoordinatorServer` to explicitly support restart semantics) and ensure all
started servers are closed at the end of the test.
```suggestion
try {
// start 3 coordinator servers
for (int i = 0; i < 3; i++) {
CoordinatorServer server = coordinatorServerList.get(i);
server.start();
}
// random coordinator become leader
waitUntilCoordinatorServerElected();
CoordinatorAddress firstLeaderAddress =
zookeeperClient.getCoordinatorLeaderAddress().get();
// Find the leader and simulate its failure and rejoin with a
new instance.
CoordinatorServer firstLeader = null;
int firstLeaderIndex = -1;
for (int i = 0; i < coordinatorServerList.size(); i++) {
CoordinatorServer coordinatorServer =
coordinatorServerList.get(i);
if (Objects.equals(coordinatorServer.getServerId(),
firstLeaderAddress.getId())) {
firstLeader = coordinatorServer;
firstLeaderIndex = i;
break;
}
}
assertThat(firstLeader).isNotNull();
firstLeader.close();
// Create a new CoordinatorServer instance to model the node
rejoining.
CoordinatorServer restartedCoordinator =
new CoordinatorServer(createConfiguration());
coordinatorServerList.set(firstLeaderIndex,
restartedCoordinator);
restartedCoordinator.start();
// Then we should get another Coordinator server leader elected
waitUntilCoordinatorServerReelected(firstLeaderAddress);
CoordinatorAddress secondLeaderAddress =
zookeeperClient.getCoordinatorLeaderAddress().get();
assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress);
CoordinatorServer secondLeader = null;
for (CoordinatorServer coordinatorServer :
coordinatorServerList) {
if (Objects.equals(coordinatorServer.getServerId(),
secondLeaderAddress.getId())) {
secondLeader = coordinatorServer;
break;
}
}
CoordinatorServer nonLeader = null;
```
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -305,13 +305,44 @@ public static String path() {
//
------------------------------------------------------------------------------------------
/**
- * The znode for the active coordinator. The znode path is:
+ * The znode for alive coordinators. The znode path is:
*
- * <p>/coordinators/active
+ * <p>/coordinators/ids
+ */
+ public static final class CoordinatorIdsZNode {
+ public static String path() {
+ return "/coordinators/ids";
+ }
+ }
+
+ /**
+ * The znode for a registered Coordinator information. The znode path is:
+ *
+ * <p>/coordinators/ids/[serverId]
+ */
+ public static final class CoordinatorIdZNode {
+ public static String path(String serverId) {
+ return CoordinatorIdsZNode.path() + "/" + serverId;
+ }
+ }
+
+ /**
+ * The znode for the coordinator leader election. The znode path is:
+ *
+ * <p>/coordinators/election
+ */
+ public static final class CoordinatorElectionZNode {
+ public static String path() {
+ return "/coordinators/election";
+ }
+ }
+
+ /**
+ * The znode for the active coordinator leader. The znode path is:
*
- * <p>Note: introduce standby coordinators in the future for znode
"/coordinators/standby/".
+ * <p>/coordinators/leader
*/
- public static final class CoordinatorZNode {
+ public static final class CoordinatorLeaderZNode {
public static String path() {
return "/coordinators/active";
}
Review Comment:
`CoordinatorLeaderZNode` JavaDoc says the znode path is
`/coordinators/leader`, but `path()` currently returns `/coordinators/active`.
This mismatch is likely to confuse operators and can lead to clients
reading/writing different znodes if they follow the documentation. Please
either update the JavaDoc to match the actual path (if keeping backward
compatibility), or change `path()` to return `/coordinators/leader` (and handle
migration/compat if needed).
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -170,10 +176,38 @@ public static void main(String[] args) {
@Override
protected void startServices() throws Exception {
+ electCoordinatorLeaderAsync();
+ }
+
+ private CompletableFuture<Void> electCoordinatorLeaderAsync() throws
Exception {
+ this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
+
+ // Coordinator Server supports high availability. If 3 coordinator
servers are alive,
+ // one of them will be elected as leader and the other two will be
standby.
+ // When leader fails, one of standby coordinators will be elected as
new leader.
+ registerCoordinatorServer();
+ ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
+ zkClient, this::registerCoordinatorServer, this);
+
+ // standby
+ this.coordinatorLeaderElection = new
CoordinatorLeaderElection(zkClient, serverId);
+ this.leaderElectionFuture =
+ coordinatorLeaderElection.startElectLeaderAsync(
+ () -> {
+ try {
+ startCoordinatorLeaderService();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ return leaderElectionFuture;
+ }
Review Comment:
`startServices()` now returns immediately after starting leader election, so
`CoordinatorServer.start()` can return before RPC/metrics/event-processor are
initialized (previously they were started synchronously). This makes startup
failures in `startCoordinatorLeaderService()` asynchronous and they won't be
reported via `ServerBase.start()` (which only observes exceptions thrown from
`startServices()`). Consider blocking in `startServices()` until either (a)
this node becomes leader and leader services have started, or (b) leader
election has definitively failed; alternatively, add explicit readiness/failure
propagation so callers don't treat the server as started while it's still
waiting/electing.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in
zookeeper. */
+public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorChangeWatcher.class);
+
+ public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient,
EventManager eventManager) {
+ super(zooKeeperClient, eventManager,
ZkData.CoordinatorIdsZNode.path());
+ }
+
+ @Override
+ protected CuratorCacheListener createListener() {
+ return new CoordinatorChangeListener();
+ }
+
+ @Override
+ protected String getWatcherName() {
+ return "CoordinatorChangeWatcher";
+ }
+
+ protected String getServerIdFromEvent(ChildData data) {
+ try {
+ return ZKPaths.getNodeFromPath(data.getPath());
+ } catch (NumberFormatException e) {
+ throw new FlussRuntimeException(
+ "Invalid server id in zookeeper path: " + data.getPath(),
e);
+ }
+ }
+
+ private final class CoordinatorChangeListener implements
CuratorCacheListener {
+
+ @Override
+ public void event(Type type, ChildData oldData, ChildData newData) {
+ if (newData != null) {
+ LOG.debug("Received {} event (path: {})", type,
newData.getPath());
+ } else {
+ LOG.debug("Received {} event", type);
+ }
+
+ switch (type) {
+ case NODE_CREATED:
+ {
+ if (newData != null && newData.getData().length > 0) {
+ String serverId = getServerIdFromEvent(newData);
+ LOG.info("Received CHILD_ADDED event for server
{}.", serverId);
+ eventManager.put(new
NewCoordinatorEvent(serverId));
+ }
+ break;
+ }
+ case NODE_DELETED:
+ {
+ if (oldData != null && oldData.getData().length > 0) {
+ String serverId = getServerIdFromEvent(oldData);
+ LOG.info("Received CHILD_REMOVED event for server
{}.", serverId);
+ eventManager.put(new
DeadCoordinatorEvent(serverId));
+ }
Review Comment:
`CoordinatorChangeWatcher` ignores create/delete events unless
`ChildData.getData().length > 0`. But coordinator server registration nodes are
created with no payload (`ZooKeeperClient.registerCoordinatorServer` uses
`forPath(path)`), so `length` will be 0 (and `getData()` may even be null).
This prevents `NewCoordinatorEvent`/`DeadCoordinatorEvent` from ever being
emitted and can NPE on `getData()`. Consider removing the data-length gate (use
the id from the path), or writing a non-empty payload when registering
coordinator servers, and always null-check `getData()`.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in
zookeeper. */
+public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorChangeWatcher.class);
+
+ public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient,
EventManager eventManager) {
+ super(zooKeeperClient, eventManager,
ZkData.CoordinatorIdsZNode.path());
+ }
+
+ @Override
+ protected CuratorCacheListener createListener() {
+ return new CoordinatorChangeListener();
+ }
+
+ @Override
+ protected String getWatcherName() {
+ return "CoordinatorChangeWatcher";
+ }
+
+ protected String getServerIdFromEvent(ChildData data) {
+ try {
+ return ZKPaths.getNodeFromPath(data.getPath());
+ } catch (NumberFormatException e) {
+ throw new FlussRuntimeException(
+ "Invalid server id in zookeeper path: " + data.getPath(),
e);
+ }
Review Comment:
`getServerIdFromEvent()` catches `NumberFormatException`, but it doesn't
parse the server id (it only calls `ZKPaths.getNodeFromPath`). This catch block
appears to be leftover from the tablet-server watcher and makes the intent
unclear. Consider removing the try/catch or validating the server id format
explicitly if needed.
```suggestion
return ZKPaths.getNodeFromPath(data.getPath());
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** Using by coordinator server. Coordinator servers listen ZK node and elect
leadership. */
+public class CoordinatorLeaderElection implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorLeaderElection.class);
+
+ private final String serverId;
+ private final LeaderLatch leaderLatch;
+ private final AtomicBoolean isLeader = new AtomicBoolean(false);
+ private final CompletableFuture<Void> leaderReadyFuture = new
CompletableFuture<>();
+ private volatile Thread electionThread;
+
+ public CoordinatorLeaderElection(ZooKeeperClient zkClient, String
serverId) {
+ this.serverId = serverId;
+ this.leaderLatch =
+ new LeaderLatch(
+ zkClient.getCuratorClient(),
+ ZkData.CoordinatorElectionZNode.path(),
+ String.valueOf(serverId));
+ }
+
+ /**
+ * Starts the leader election process asynchronously. The returned future
completes when this
+ * server becomes the leader and initializes the leader services.
+ *
+ * @param initLeaderServices the runnable to initialize leader services
once elected
+ * @return a CompletableFuture that completes when this server becomes
leader
+ */
+ public CompletableFuture<Void> startElectLeaderAsync(Runnable
initLeaderServices) {
+ leaderLatch.addListener(
+ new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ LOG.info("Coordinator server {} has become the
leader.", serverId);
+ isLeader.set(true);
+ }
+
+ @Override
+ public void notLeader() {
+ relinquishLeadership();
Review Comment:
`CoordinatorLeaderElection.notLeader()` calls `relinquishLeadership()`,
which closes the `LeaderLatch` and completes the election future exceptionally.
This effectively removes the server from future elections on transient
disconnects or leadership loss, and for standby nodes it can prevent them from
ever becoming leader later. Instead of closing the latch on `notLeader`,
consider keeping the latch running and only stopping/cleaning up leader-only
services when leadership is lost; if the process should exit on fencing,
explicitly propagate that to `CoordinatorServer` so it shuts down.
```suggestion
isLeader.set(false);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]