swuferhong commented on code in PR #1401:
URL: https://github.com/apache/fluss/pull/1401#discussion_r2451137605


##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerde.java:
##########
@@ -51,7 +51,7 @@ public void serialize(CoordinatorAddress coordinatorAddress, 
JsonGenerator gener
             throws IOException {
         generator.writeStartObject();
         writeVersion(generator);
-        generator.writeStringField(ID, coordinatorAddress.getId());
+        generator.writeNumberField(ID, coordinatorAddress.getId());

Review Comment:
   Don't modify it directly, backward compatibility must be preserved. In older 
versions, the ID is stored as `"id":"0"  (string)`, whereas in the new version 
it's stored as `"id":0 (integer)`. I believe the current code `int id = 
node.get(ID).asInt()` would throw an exception when encountering `"id":"0"` 
from older versions, since it's a JSON string, not a number.
   
   The correct approach is to bump the schema version to `3`, and handle 
deserialization conditionally based on the version:
   
   For `version 2`: read the id field as a string, then parse it to an integer.
   For `version 3`: read the "id" field directly as an integer.
   Additionally, compatibility tests must be added to verify that.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -54,6 +54,7 @@ public class CoordinatorContext {
     private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorContext.class);
 
     public static final int INITIAL_COORDINATOR_EPOCH = 0;
+    public static final int INITIAL_COORDINATOR_EPOCH_ZKVERSION = 0;

Review Comment:
   `INITIAL_COORDINATOR_EPOCH_ZKVERSION` -> 
`INITIAL_COORDINATOR_EPOCH_ZK_VERSION`



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -272,6 +298,41 @@ protected CompletableFuture<Result> closeAsync(Result 
result) {
         return terminationFuture;
     }
 
+    private void registerCoordinatorServer() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // we need to retry to register since although

Review Comment:
   Format it.



##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java:
##########
@@ -63,10 +67,20 @@ protected ServerBase getStartFailServer() {
     protected void checkAfterStartServer() throws Exception {
         assertThat(coordinatorServer.getRpcServer()).isNotNull();
         // check the data put in zk after coordinator server start
-        Optional<CoordinatorAddress> optCoordinatorAddr = 
zookeeperClient.getCoordinatorAddress();
+        Optional<CoordinatorAddress> optCoordinatorAddr =
+                zookeeperClient.getCoordinatorLeaderAddress();
         assertThat(optCoordinatorAddr).isNotEmpty();
         verifyEndpoint(
                 optCoordinatorAddr.get().getEndpoints(),
                 coordinatorServer.getRpcServer().getBindEndpoints());
     }
+
+    public void waitUtilCoordinatorServerElected() {
+        waitUntil(
+                () -> {
+                    return 
zookeeperClient.getCoordinatorLeaderAddress().isPresent();

Review Comment:
   ditto,replace with expression lambda



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcher.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorServerEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorServerEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in 
zookeeper. */
+public class CoordinatorServerChangeWatcher {
+

Review Comment:
   Rename to `CoordinatorChangeWatcher`?  I found classes related to the 
`coordinatorServer` are all named `CoordinatorXxxx`.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -213,8 +239,8 @@ protected void startServices() throws Exception {
 
             registerCoordinatorLeader();
             // when init session, register coordinator server again
-            ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
-                    zkClient, this::registerCoordinatorLeader, this);
+            //            
ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
+            //                    zkClient, this::registerCoordinatorLeader, 
this);

Review Comment:
   Why remove?



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -161,20 +162,104 @@ public Optional<byte[]> getOrEmpty(String path) throws 
Exception {
     // Coordinator server
     // 
--------------------------------------------------------------------------------------------
 
-    /** Register a coordinator leader server to ZK. */
+    /** Register a coordinator server to ZK. */
+    public void registerCoordinatorServer(int coordinatorId) throws Exception {
+        String path = ZkData.CoordinatorIdZNode.path(coordinatorId);
+        
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
+        LOG.info("Registered Coordinator server {} at path {}.", 
coordinatorId, path);
+    }
+
+    /**
+     * Become coordinator leader. This method is a step after 
electCoordinatorLeader() and before
+     * registerCoordinatorLeader(). This is to ensure the coordinator get and 
update the coordinator
+     * epoch and coordinator epoch zk version.
+     */
+    public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId) 
throws Exception {
+        try {
+            ensureEpochZnodeExists();
+
+            try {
+                Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
+                int currentEpoch = getEpoch.f0;
+                int currentVersion = getEpoch.f1;
+                int newEpoch = currentEpoch + 1;
+                LOG.info(
+                        "Coordinator leader {} tries to update epoch. Current 
epoch={}, Zookeeper version={}, new epoch={}",
+                        coordinatorId,
+                        currentEpoch,
+                        currentVersion,
+                        newEpoch);
+
+                // atomically update epoch
+                zkClient.setData()
+                        .withVersion(currentVersion)
+                        .forPath(
+                                ZkData.CoordinatorEpochZNode.path(),
+                                ZkData.CoordinatorEpochZNode.encode(newEpoch));
+
+                return Optional.of(newEpoch);
+            } catch (KeeperException.BadVersionException e) {
+                // Other coordinator leader has updated epoch.
+                // If this happens, it means our fence is in effect.
+                LOG.info("Coordinator leader {} failed to update epoch.", 
coordinatorId);
+            }
+        } catch (KeeperException.NodeExistsException e) {
+        }
+        return Optional.empty();
+    }
+
+    /** Register a coordinator leader to ZK. */
     public void registerCoordinatorLeader(CoordinatorAddress 
coordinatorAddress) throws Exception {
-        String path = CoordinatorZNode.path();
+        String path = ZkData.CoordinatorLeaderZNode.path();
         zkClient.create()
                 .creatingParentsIfNeeded()
                 .withMode(CreateMode.EPHEMERAL)
-                .forPath(path, CoordinatorZNode.encode(coordinatorAddress));
-        LOG.info("Registered leader {} at path {}.", coordinatorAddress, path);
+                .forPath(path, 
ZkData.CoordinatorLeaderZNode.encode(coordinatorAddress));
+        LOG.info("Registered Coordinator leader {} at path {}.", 
coordinatorAddress, path);
     }
 
     /** Get the leader address registered in ZK. */
-    public Optional<CoordinatorAddress> getCoordinatorAddress() throws 
Exception {
-        Optional<byte[]> bytes = getOrEmpty(CoordinatorZNode.path());
-        return bytes.map(CoordinatorZNode::decode);
+    public Optional<CoordinatorAddress> getCoordinatorLeaderAddress() throws 
Exception {
+        Optional<byte[]> bytes = 
getOrEmpty(ZkData.CoordinatorLeaderZNode.path());
+        return bytes.map(
+                data ->
+                        // maybe a empty node when a leader is elected but not 
registered

Review Comment:
   `maybe a` => `maybe an`



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -104,13 +106,41 @@ public class CoordinatorContext {
 
     private ServerInfo coordinatorServerInfo = null;
     private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
+    private int coordinatorEpochZkVersion = 
INITIAL_COORDINATOR_EPOCH_ZKVERSION;
 
     public CoordinatorContext() {}
 
     public int getCoordinatorEpoch() {
         return coordinatorEpoch;
     }
 
+    public int getCoordinatorEpochZkVersion() {
+        return coordinatorEpochZkVersion;
+    }
+
+    public void setCoordinatorEpochAndZkVersion(int newEpoch, int 
newZkVersion) {
+        this.coordinatorEpoch = newEpoch;

Review Comment:
   It seems the `newZkVersion` will not be updated according to the value get 
from `zookeeper`?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -104,13 +106,41 @@ public class CoordinatorContext {
 
     private ServerInfo coordinatorServerInfo = null;
     private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
+    private int coordinatorEpochZkVersion = 
INITIAL_COORDINATOR_EPOCH_ZKVERSION;

Review Comment:
   This value `coordinatorEpochZkVersion` don't need to be persisted in 
ZooKeeper? If it does need to be persisted, shouldn't we design the ZkData 
structure for it in this PR upfront?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -104,13 +106,41 @@ public class CoordinatorContext {
 
     private ServerInfo coordinatorServerInfo = null;
     private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
+    private int coordinatorEpochZkVersion = 
INITIAL_COORDINATOR_EPOCH_ZKVERSION;
 
     public CoordinatorContext() {}
 
     public int getCoordinatorEpoch() {
         return coordinatorEpoch;
     }
 
+    public int getCoordinatorEpochZkVersion() {
+        return coordinatorEpochZkVersion;
+    }
+
+    public void setCoordinatorEpochAndZkVersion(int newEpoch, int 
newZkVersion) {
+        this.coordinatorEpoch = newEpoch;
+        this.coordinatorEpochZkVersion = newZkVersion;
+    }
+
+    public Set<Integer> getLiveCoordinatorServers() {
+        return liveCoordinatorServers;
+    }
+
+    @VisibleForTesting

Review Comment:
   No need? No test call this method.



##########
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:
##########
@@ -832,6 +834,15 @@ public CoordinatorServer getCoordinatorServer() {
         return coordinatorServer;
     }
 
+    public void waitUtilCoordinatorServerElected() {
+        waitUntil(
+                () -> {
+                    return 
zooKeeperClient.getCoordinatorLeaderAddress().isPresent();

Review Comment:
   Replace with expression lambda.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -161,20 +162,104 @@ public Optional<byte[]> getOrEmpty(String path) throws 
Exception {
     // Coordinator server
     // 
--------------------------------------------------------------------------------------------
 
-    /** Register a coordinator leader server to ZK. */
+    /** Register a coordinator server to ZK. */
+    public void registerCoordinatorServer(int coordinatorId) throws Exception {
+        String path = ZkData.CoordinatorIdZNode.path(coordinatorId);
+        
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
+        LOG.info("Registered Coordinator server {} at path {}.", 
coordinatorId, path);
+    }
+
+    /**
+     * Become coordinator leader. This method is a step after 
electCoordinatorLeader() and before
+     * registerCoordinatorLeader(). This is to ensure the coordinator get and 
update the coordinator
+     * epoch and coordinator epoch zk version.
+     */
+    public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId) 
throws Exception {
+        try {
+            ensureEpochZnodeExists();
+
+            try {
+                Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
+                int currentEpoch = getEpoch.f0;
+                int currentVersion = getEpoch.f1;
+                int newEpoch = currentEpoch + 1;
+                LOG.info(
+                        "Coordinator leader {} tries to update epoch. Current 
epoch={}, Zookeeper version={}, new epoch={}",
+                        coordinatorId,
+                        currentEpoch,
+                        currentVersion,
+                        newEpoch);
+
+                // atomically update epoch
+                zkClient.setData()
+                        .withVersion(currentVersion)
+                        .forPath(
+                                ZkData.CoordinatorEpochZNode.path(),
+                                ZkData.CoordinatorEpochZNode.encode(newEpoch));
+
+                return Optional.of(newEpoch);
+            } catch (KeeperException.BadVersionException e) {
+                // Other coordinator leader has updated epoch.
+                // If this happens, it means our fence is in effect.
+                LOG.info("Coordinator leader {} failed to update epoch.", 
coordinatorId);
+            }
+        } catch (KeeperException.NodeExistsException e) {
+        }

Review Comment:
   Why catch this error but do nothing?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java:
##########
@@ -62,6 +62,7 @@ public final class CoordinatorEventManager implements 
EventManager {
     private Histogram eventQueueTime;
 
     // Coordinator metrics moved from CoordinatorEventProcessor
+    private volatile int aliveCoordinatorServerCount;

Review Comment:
    `aliveCoordinatorServerCount` is never assigned value now?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -248,7 +257,7 @@ public void shutdown() {
     private ServerInfo getCoordinatorServerInfo() {
         try {
             return zooKeeperClient
-                    .getCoordinatorAddress()
+                    .getCoordinatorLeaderAddress()
                     .map(
                             coordinatorAddress ->
                                     // TODO we set id to 0 as that 
CoordinatorServer don't support

Review Comment:
   Shouldn't this logic be updated so that we no longer pass coordinatorServer 
ID as `0`?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcher.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorServerEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorServerEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
+import 
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in 
zookeeper. */
+public class CoordinatorServerChangeWatcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorServerChangeWatcher.class);

Review Comment:
   Do we need to introduce the class `CoordinatorServerChangeWatcher`? Its code 
is almost identical to that of `TabletServerChangeWatcher`, differing only in 
the input `zkPath`. Could we instead abstract `TabletServerChangeWatcher` into 
a common `ServerChangeWatcher`?



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -270,6 +301,24 @@ public static CoordinatorAddress decode(byte[] json) {
         }
     }
 
+    /**
+     * The znode for the coordinator epoch. The znode path is:
+     *
+     * <p>/coordinators/epoch
+     */
+    public static final class CoordinatorEpochZNode {
+        public static String path() {
+            return "/coordinators/epoch";
+        }
+
+        public static byte[] encode(int epoch) {
+            return String.valueOf(epoch).getBytes();
+        }
+
+        public static int decode(byte[] bytes) {
+            return Integer.parseInt(new String(bytes));
+        }
+    }

Review Comment:
    I feel the design here is a bit too complicated—we probably don't need 
separate `leader` and `epoch` to different node:
   1. For `CoordinatorIdZNode`, it should store static node information, such 
as the data currently in `CoordinatorAddress`.
   2. For `/coordinators/leader` and `/coordinators/epoch`, they should be 
merged into a single node, e.g., leader, which holds data like `["id"=xxx, 
"epoch"=xxx]`. Each update should read the previous epoch and then 
increment/update it accordingly.
   3. Need to consider compatibility with the original version, as 
`CoordinatorAddress` stored in /coordinators/active.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to