This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.0.6-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.6-prepare by this push:
     new 57fff8c686 [Fix-13913][Master] syncMasterNodes does not make current 
slot correctly after zookeeper reconnect (#14014)
57fff8c686 is described below

commit 57fff8c6863684a9a51a7a5a62fa04cf9badee93
Author: Aaron Wang <[email protected]>
AuthorDate: Thu May 25 19:18:55 2023 +0800

    [Fix-13913][Master] syncMasterNodes does not make current slot correctly 
after zookeeper reconnect (#14014)
---
 .../master/registry/MasterConnectionStateListener.java      |  6 ++++--
 .../server/master/registry/MasterRegistryClient.java        | 13 +++++++++----
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
index bc1b217f9e..22ada4f02a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -32,10 +32,12 @@ public class MasterConnectionStateListener implements 
ConnectionListener {
 
     private final String masterNodePath;
     private final RegistryClient registryClient;
+    private final MasterHeartBeatTask masterHeartBeatTask;
 
-    public MasterConnectionStateListener(String masterNodePath, RegistryClient 
registryClient) {
+    public MasterConnectionStateListener(String masterNodePath, RegistryClient 
registryClient, MasterHeartBeatTask masterHeartBeatTask) {
         this.masterNodePath = checkNotNull(masterNodePath);
         this.registryClient = checkNotNull(registryClient);
+        this.masterHeartBeatTask = checkNotNull(masterHeartBeatTask);
     }
 
     @Override
@@ -50,7 +52,7 @@ public class MasterConnectionStateListener implements 
ConnectionListener {
             case RECONNECTED:
                 logger.debug("registry connection state is {}, clean the node 
info", state);
                 registryClient.remove(masterNodePath);
-                registryClient.persistEphemeral(masterNodePath, "");
+                registryClient.persistEphemeral(masterNodePath, 
masterHeartBeatTask.getHeartBeatInfo());
                 break;
             case DISCONNECTED:
                 logger.warn("registry connection state is {}, ready to stop 
myself", state);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 56167f65b4..f8ce97c0e3 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -79,6 +79,11 @@ public class MasterRegistryClient implements AutoCloseable {
      */
     private ScheduledExecutorService heartBeatExecutor;
 
+    /**
+     * master heartbeat task
+     */
+    private MasterHeartBeatTask masterHeartBeatTask;
+
     /**
      * master startup time, ms
      */
@@ -96,7 +101,7 @@ public class MasterRegistryClient implements AutoCloseable {
             // master registry
             registry();
             registryClient.addConnectionStateListener(new 
MasterConnectionStateListener(getCurrentNodePath(),
-                registryClient));
+                registryClient, masterHeartBeatTask));
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new 
MasterRegistryDataListener());
         } catch (Exception e) {
             throw new RegistryException("Master registry client start up 
error", e);
@@ -189,7 +194,7 @@ public class MasterRegistryClient implements AutoCloseable {
         logger.info("Master node : {} registering to registry center", 
masterAddress);
         String localNodePath = getCurrentNodePath();
         Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
-        MasterHeartBeatTask heartBeatTask = new 
MasterHeartBeatTask(startupTime,
+        masterHeartBeatTask = new MasterHeartBeatTask(startupTime,
             masterConfig.getMaxCpuLoadAvg(),
             masterConfig.getReservedMemory(),
             Sets.newHashSet(localNodePath),
@@ -197,7 +202,7 @@ public class MasterRegistryClient implements AutoCloseable {
 
         // remove before persist
         registryClient.remove(localNodePath);
-        registryClient.persistEphemeral(localNodePath, 
heartBeatTask.getHeartBeatInfo());
+        registryClient.persistEphemeral(localNodePath, 
masterHeartBeatTask.getHeartBeatInfo());
 
         while (!registryClient.checkNodeExists(NetUtils.getHost(), 
NodeType.MASTER)) {
             logger.warn("The current master server node:{} cannot find in 
registry", NetUtils.getHost());
@@ -210,7 +215,7 @@ public class MasterRegistryClient implements AutoCloseable {
         // delete dead server
         registryClient.handleDeadServer(Collections.singleton(localNodePath), 
NodeType.MASTER, Constants.DELETE_OP);
 
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, 
masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
+        this.heartBeatExecutor.scheduleAtFixedRate(masterHeartBeatTask, 0L, 
masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
         logger.info("Master node : {} registered to registry center 
successfully with heartBeatInterval : {}s", masterAddress, 
masterHeartbeatInterval);
 
     }

Reply via email to