This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 842c540 [Improvement][Server] Must restart master if Zk reconnect
(#5210) (#5211)
842c540 is described below
commit 842c5400e605a8b8eb0d8fdc78701f10222063fd
Author: ruanwenjun <[email protected]>
AuthorDate: Tue May 25 05:03:29 2021 +0800
[Improvement][Server] Must restart master if Zk reconnect (#5210) (#5211)
---
.../server/master/registry/MasterRegistry.java | 2 --
.../server/worker/registry/WorkerRegistry.java | 2 --
.../dolphinscheduler/service/zk/ZookeeperOperator.java | 12 +++++++-----
3 files changed, 7 insertions(+), 9 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 09e9e22..07b2f82 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -90,10 +90,8 @@ public class MasterRegistry {
logger.error("master : {} connection lost from zookeeper",
address);
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("master : {} reconnected to zookeeper",
address);
-
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath,
"");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("master : {} connection SUSPENDED ", address);
-
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath,
"");
}
});
int masterHeartbeatInterval =
masterConfig.getMasterHeartbeatInterval();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 0ef9956..a045cc9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -111,10 +111,8 @@ public class WorkerRegistry {
logger.error("worker : {} connection lost from
zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("worker : {} reconnected to zookeeper",
address);
-
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ",
address);
-
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
}
});
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index 6652f87..ebe061e 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -189,18 +189,20 @@ public class ZookeeperOperator implements
InitializingBean {
}
}
- public void persistEphemeral(final String key, final String value) {
+ public void persistEphemeral(final String path, final String value) {
try {
- if (isExisted(key)) {
+ // If the ephemeral node exist and the data is not equals to the
given value
+ // delete the old node
+ if (isExisted(path) && !value.equals(get(path))) {
try {
- zkClient.delete().deletingChildrenIfNeeded().forPath(key);
+ zkClient.delete().deletingChildrenIfNeeded().forPath(path);
} catch (NoNodeException ignore) {
//NOP
}
}
-
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key,
value.getBytes(StandardCharsets.UTF_8));
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,
value.getBytes(StandardCharsets.UTF_8));
} catch (final Exception ex) {
- logger.error("persistEphemeral key : {} , value : {}", key, value,
ex);
+ logger.error("persistEphemeral path : {} , value : {}", path,
value, ex);
}
}