swuferhong commented on code in PR #1401:
URL: https://github.com/apache/fluss/pull/1401#discussion_r2451137605
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerde.java:
##########
@@ -51,7 +51,7 @@ public void serialize(CoordinatorAddress coordinatorAddress,
JsonGenerator gener
throws IOException {
generator.writeStartObject();
writeVersion(generator);
- generator.writeStringField(ID, coordinatorAddress.getId());
+ generator.writeNumberField(ID, coordinatorAddress.getId());
Review Comment:
Don't modify it directly, backward compatibility must be preserved. In older
versions, the ID is stored as `"id":"0" (string)`, whereas in the new version
it's stored as `"id":0 (integer)`. I believe the current code `int id =
node.get(ID).asInt()` would throw an exception when encountering `"id":"0"`
from older versions, since it's a JSON string, not a number.
The correct approach is to bump the schema version to `3`, and handle
deserialization conditionally based on the version:
For `version 2`: read the id field as a string, then parse it to an integer.
For `version 3`: read the "id" field directly as an integer.
Additionally, compatibility tests must be added to verify that.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -54,6 +54,7 @@ public class CoordinatorContext {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorContext.class);
public static final int INITIAL_COORDINATOR_EPOCH = 0;
+ public static final int INITIAL_COORDINATOR_EPOCH_ZKVERSION = 0;
Review Comment:
`INITIAL_COORDINATOR_EPOCH_ZKVERSION` ->
`INITIAL_COORDINATOR_EPOCH_ZK_VERSION`
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -272,6 +298,41 @@ protected CompletableFuture<Result> closeAsync(Result
result) {
return terminationFuture;
}
+ private void registerCoordinatorServer() throws Exception {
+ long startTime = System.currentTimeMillis();
+
+ // we need to retry to register since although
Review Comment:
Format it.
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java:
##########
@@ -63,10 +67,20 @@ protected ServerBase getStartFailServer() {
protected void checkAfterStartServer() throws Exception {
assertThat(coordinatorServer.getRpcServer()).isNotNull();
// check the data put in zk after coordinator server start
- Optional<CoordinatorAddress> optCoordinatorAddr =
zookeeperClient.getCoordinatorAddress();
+ Optional<CoordinatorAddress> optCoordinatorAddr =
+ zookeeperClient.getCoordinatorLeaderAddress();
assertThat(optCoordinatorAddr).isNotEmpty();
verifyEndpoint(
optCoordinatorAddr.get().getEndpoints(),
coordinatorServer.getRpcServer().getBindEndpoints());
}
+
+ public void waitUtilCoordinatorServerElected() {
+ waitUntil(
+ () -> {
+ return
zookeeperClient.getCoordinatorLeaderAddress().isPresent();
Review Comment:
ditto,replace with expression lambda
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcher.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorServerEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorServerEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in
zookeeper. */
+public class CoordinatorServerChangeWatcher {
+
Review Comment:
Rename to `CoordinatorChangeWatcher`? I found classes related to the
`coordinatorServer` are all named `CoordinatorXxxx`.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -213,8 +239,8 @@ protected void startServices() throws Exception {
registerCoordinatorLeader();
// when init session, register coordinator server again
- ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
- zkClient, this::registerCoordinatorLeader, this);
+ //
ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
+ // zkClient, this::registerCoordinatorLeader,
this);
Review Comment:
Why remove?
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -161,20 +162,104 @@ public Optional<byte[]> getOrEmpty(String path) throws
Exception {
// Coordinator server
//
--------------------------------------------------------------------------------------------
- /** Register a coordinator leader server to ZK. */
+ /** Register a coordinator server to ZK. */
+ public void registerCoordinatorServer(int coordinatorId) throws Exception {
+ String path = ZkData.CoordinatorIdZNode.path(coordinatorId);
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
+ LOG.info("Registered Coordinator server {} at path {}.",
coordinatorId, path);
+ }
+
+ /**
+ * Become coordinator leader. This method is a step after
electCoordinatorLeader() and before
+ * registerCoordinatorLeader(). This is to ensure the coordinator get and
update the coordinator
+ * epoch and coordinator epoch zk version.
+ */
+ public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId)
throws Exception {
+ try {
+ ensureEpochZnodeExists();
+
+ try {
+ Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
+ int currentEpoch = getEpoch.f0;
+ int currentVersion = getEpoch.f1;
+ int newEpoch = currentEpoch + 1;
+ LOG.info(
+ "Coordinator leader {} tries to update epoch. Current
epoch={}, Zookeeper version={}, new epoch={}",
+ coordinatorId,
+ currentEpoch,
+ currentVersion,
+ newEpoch);
+
+ // atomically update epoch
+ zkClient.setData()
+ .withVersion(currentVersion)
+ .forPath(
+ ZkData.CoordinatorEpochZNode.path(),
+ ZkData.CoordinatorEpochZNode.encode(newEpoch));
+
+ return Optional.of(newEpoch);
+ } catch (KeeperException.BadVersionException e) {
+ // Other coordinator leader has updated epoch.
+ // If this happens, it means our fence is in effect.
+ LOG.info("Coordinator leader {} failed to update epoch.",
coordinatorId);
+ }
+ } catch (KeeperException.NodeExistsException e) {
+ }
+ return Optional.empty();
+ }
+
+ /** Register a coordinator leader to ZK. */
public void registerCoordinatorLeader(CoordinatorAddress
coordinatorAddress) throws Exception {
- String path = CoordinatorZNode.path();
+ String path = ZkData.CoordinatorLeaderZNode.path();
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
- .forPath(path, CoordinatorZNode.encode(coordinatorAddress));
- LOG.info("Registered leader {} at path {}.", coordinatorAddress, path);
+ .forPath(path,
ZkData.CoordinatorLeaderZNode.encode(coordinatorAddress));
+ LOG.info("Registered Coordinator leader {} at path {}.",
coordinatorAddress, path);
}
/** Get the leader address registered in ZK. */
- public Optional<CoordinatorAddress> getCoordinatorAddress() throws
Exception {
- Optional<byte[]> bytes = getOrEmpty(CoordinatorZNode.path());
- return bytes.map(CoordinatorZNode::decode);
+ public Optional<CoordinatorAddress> getCoordinatorLeaderAddress() throws
Exception {
+ Optional<byte[]> bytes =
getOrEmpty(ZkData.CoordinatorLeaderZNode.path());
+ return bytes.map(
+ data ->
+ // maybe a empty node when a leader is elected but not
registered
Review Comment:
`maybe a` => `maybe an`
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -104,13 +106,41 @@ public class CoordinatorContext {
private ServerInfo coordinatorServerInfo = null;
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
+ private int coordinatorEpochZkVersion =
INITIAL_COORDINATOR_EPOCH_ZKVERSION;
public CoordinatorContext() {}
public int getCoordinatorEpoch() {
return coordinatorEpoch;
}
+ public int getCoordinatorEpochZkVersion() {
+ return coordinatorEpochZkVersion;
+ }
+
+ public void setCoordinatorEpochAndZkVersion(int newEpoch, int
newZkVersion) {
+ this.coordinatorEpoch = newEpoch;
Review Comment:
It seems the `newZkVersion` will not be updated according to the value get
from `zookeeper`?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -104,13 +106,41 @@ public class CoordinatorContext {
private ServerInfo coordinatorServerInfo = null;
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
+ private int coordinatorEpochZkVersion =
INITIAL_COORDINATOR_EPOCH_ZKVERSION;
Review Comment:
This value `coordinatorEpochZkVersion` don't need to be persisted in
ZooKeeper? If it does need to be persisted, shouldn't we design the ZkData
structure for it in this PR upfront?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java:
##########
@@ -104,13 +106,41 @@ public class CoordinatorContext {
private ServerInfo coordinatorServerInfo = null;
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
+ private int coordinatorEpochZkVersion =
INITIAL_COORDINATOR_EPOCH_ZKVERSION;
public CoordinatorContext() {}
public int getCoordinatorEpoch() {
return coordinatorEpoch;
}
+ public int getCoordinatorEpochZkVersion() {
+ return coordinatorEpochZkVersion;
+ }
+
+ public void setCoordinatorEpochAndZkVersion(int newEpoch, int
newZkVersion) {
+ this.coordinatorEpoch = newEpoch;
+ this.coordinatorEpochZkVersion = newZkVersion;
+ }
+
+ public Set<Integer> getLiveCoordinatorServers() {
+ return liveCoordinatorServers;
+ }
+
+ @VisibleForTesting
Review Comment:
No need? No test call this method.
##########
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:
##########
@@ -832,6 +834,15 @@ public CoordinatorServer getCoordinatorServer() {
return coordinatorServer;
}
+ public void waitUtilCoordinatorServerElected() {
+ waitUntil(
+ () -> {
+ return
zooKeeperClient.getCoordinatorLeaderAddress().isPresent();
Review Comment:
Replace with expression lambda.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -161,20 +162,104 @@ public Optional<byte[]> getOrEmpty(String path) throws
Exception {
// Coordinator server
//
--------------------------------------------------------------------------------------------
- /** Register a coordinator leader server to ZK. */
+ /** Register a coordinator server to ZK. */
+ public void registerCoordinatorServer(int coordinatorId) throws Exception {
+ String path = ZkData.CoordinatorIdZNode.path(coordinatorId);
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
+ LOG.info("Registered Coordinator server {} at path {}.",
coordinatorId, path);
+ }
+
+ /**
+ * Become coordinator leader. This method is a step after
electCoordinatorLeader() and before
+ * registerCoordinatorLeader(). This is to ensure the coordinator get and
update the coordinator
+ * epoch and coordinator epoch zk version.
+ */
+ public Optional<Integer> fenceBecomeCoordinatorLeader(int coordinatorId)
throws Exception {
+ try {
+ ensureEpochZnodeExists();
+
+ try {
+ Tuple2<Integer, Integer> getEpoch = getCurrentEpoch();
+ int currentEpoch = getEpoch.f0;
+ int currentVersion = getEpoch.f1;
+ int newEpoch = currentEpoch + 1;
+ LOG.info(
+ "Coordinator leader {} tries to update epoch. Current
epoch={}, Zookeeper version={}, new epoch={}",
+ coordinatorId,
+ currentEpoch,
+ currentVersion,
+ newEpoch);
+
+ // atomically update epoch
+ zkClient.setData()
+ .withVersion(currentVersion)
+ .forPath(
+ ZkData.CoordinatorEpochZNode.path(),
+ ZkData.CoordinatorEpochZNode.encode(newEpoch));
+
+ return Optional.of(newEpoch);
+ } catch (KeeperException.BadVersionException e) {
+ // Other coordinator leader has updated epoch.
+ // If this happens, it means our fence is in effect.
+ LOG.info("Coordinator leader {} failed to update epoch.",
coordinatorId);
+ }
+ } catch (KeeperException.NodeExistsException e) {
+ }
Review Comment:
Why catch this error but do nothing?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java:
##########
@@ -62,6 +62,7 @@ public final class CoordinatorEventManager implements
EventManager {
private Histogram eventQueueTime;
// Coordinator metrics moved from CoordinatorEventProcessor
+ private volatile int aliveCoordinatorServerCount;
Review Comment:
`aliveCoordinatorServerCount` is never assigned value now?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -248,7 +257,7 @@ public void shutdown() {
private ServerInfo getCoordinatorServerInfo() {
try {
return zooKeeperClient
- .getCoordinatorAddress()
+ .getCoordinatorLeaderAddress()
.map(
coordinatorAddress ->
// TODO we set id to 0 as that
CoordinatorServer don't support
Review Comment:
Shouldn't this logic be updated so that we no longer pass coordinatorServer
ID as `0`?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcher.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event.watcher;
+
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.server.coordinator.event.DeadCoordinatorServerEvent;
+import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.NewCoordinatorServerEvent;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.ZkData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A watcher to watch the coordinator server changes(new/delete) in
zookeeper. */
+public class CoordinatorServerChangeWatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorServerChangeWatcher.class);
Review Comment:
Do we need to introduce the class `CoordinatorServerChangeWatcher`? Its code
is almost identical to that of `TabletServerChangeWatcher`, differing only in
the input `zkPath`. Could we instead abstract `TabletServerChangeWatcher` into
a common `ServerChangeWatcher`?
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -270,6 +301,24 @@ public static CoordinatorAddress decode(byte[] json) {
}
}
+ /**
+ * The znode for the coordinator epoch. The znode path is:
+ *
+ * <p>/coordinators/epoch
+ */
+ public static final class CoordinatorEpochZNode {
+ public static String path() {
+ return "/coordinators/epoch";
+ }
+
+ public static byte[] encode(int epoch) {
+ return String.valueOf(epoch).getBytes();
+ }
+
+ public static int decode(byte[] bytes) {
+ return Integer.parseInt(new String(bytes));
+ }
+ }
Review Comment:
I feel the design here is a bit too complicated—we probably don't need
separate `leader` and `epoch` to different node:
1. For `CoordinatorIdZNode`, it should store static node information, such
as the data currently in `CoordinatorAddress`.
2. For `/coordinators/leader` and `/coordinators/epoch`, they should be
merged into a single node, e.g., leader, which holds data like `["id"=xxx,
"epoch"=xxx]`. Each update should read the previous epoch and then
increment/update it accordingly.
3. Need to consider compatibility with the original version, as
`CoordinatorAddress` stored in /coordinators/active.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]