This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 842c540  [Improvement][Server] Must restart master if Zk reconnect 
(#5210) (#5211)
842c540 is described below

commit 842c5400e605a8b8eb0d8fdc78701f10222063fd
Author: ruanwenjun <[email protected]>
AuthorDate: Tue May 25 05:03:29 2021 +0800

    [Improvement][Server] Must restart master if Zk reconnect (#5210) (#5211)
---
 .../server/master/registry/MasterRegistry.java               |  2 --
 .../server/worker/registry/WorkerRegistry.java               |  2 --
 .../dolphinscheduler/service/zk/ZookeeperOperator.java       | 12 +++++++-----
 3 files changed, 7 insertions(+), 9 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 09e9e22..07b2f82 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -90,10 +90,8 @@ public class MasterRegistry {
                     logger.error("master : {} connection lost from zookeeper", 
address);
                 } else if (newState == ConnectionState.RECONNECTED) {
                     logger.info("master : {} reconnected to zookeeper", 
address);
-                    
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
                 } else if (newState == ConnectionState.SUSPENDED) {
                     logger.warn("master : {} connection SUSPENDED ", address);
-                    
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, 
"");
                 }
             });
         int masterHeartbeatInterval = 
masterConfig.getMasterHeartbeatInterval();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 0ef9956..a045cc9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -111,10 +111,8 @@ public class WorkerRegistry {
                         logger.error("worker : {} connection lost from 
zookeeper", address);
                     } else if (newState == ConnectionState.RECONNECTED) {
                         logger.info("worker : {} reconnected to zookeeper", 
address);
-                        
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, 
"");
                     } else if (newState == ConnectionState.SUSPENDED) {
                         logger.warn("worker : {} connection SUSPENDED ", 
address);
-                        
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, 
"");
                     }
                 });
             logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index 6652f87..ebe061e 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -189,18 +189,20 @@ public class ZookeeperOperator implements 
InitializingBean {
         }
     }
 
-    public void persistEphemeral(final String key, final String value) {
+    public void persistEphemeral(final String path, final String value) {
         try {
-            if (isExisted(key)) {
+            // If the ephemeral node exist and the data is not equals to the 
given value
+            // delete the old node
+            if (isExisted(path) && !value.equals(get(path))) {
                 try {
-                    zkClient.delete().deletingChildrenIfNeeded().forPath(key);
+                    zkClient.delete().deletingChildrenIfNeeded().forPath(path);
                 } catch (NoNodeException ignore) {
                     //NOP
                 }
             }
-            
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key,
 value.getBytes(StandardCharsets.UTF_8));
+            
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,
 value.getBytes(StandardCharsets.UTF_8));
         } catch (final Exception ex) {
-            logger.error("persistEphemeral key : {} , value : {}", key, value, 
ex);
+            logger.error("persistEphemeral path : {} , value : {}", path, 
value, ex);
         }
     }
 

Reply via email to