This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 3.0.6-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.6-prepare by this push:
new 57fff8c686 [Fix-13913][Master] syncMasterNodes does not make current
slot correctly after zookeeper reconnect (#14014)
57fff8c686 is described below
commit 57fff8c6863684a9a51a7a5a62fa04cf9badee93
Author: Aaron Wang <[email protected]>
AuthorDate: Thu May 25 19:18:55 2023 +0800
[Fix-13913][Master] syncMasterNodes does not make current slot correctly
after zookeeper reconnect (#14014)
---
.../master/registry/MasterConnectionStateListener.java | 6 ++++--
.../server/master/registry/MasterRegistryClient.java | 13 +++++++++----
2 files changed, 13 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
index bc1b217f9e..22ada4f02a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -32,10 +32,12 @@ public class MasterConnectionStateListener implements
ConnectionListener {
private final String masterNodePath;
private final RegistryClient registryClient;
+ private final MasterHeartBeatTask masterHeartBeatTask;
- public MasterConnectionStateListener(String masterNodePath, RegistryClient
registryClient) {
+ public MasterConnectionStateListener(String masterNodePath, RegistryClient
registryClient, MasterHeartBeatTask masterHeartBeatTask) {
this.masterNodePath = checkNotNull(masterNodePath);
this.registryClient = checkNotNull(registryClient);
+ this.masterHeartBeatTask = checkNotNull(masterHeartBeatTask);
}
@Override
@@ -50,7 +52,7 @@ public class MasterConnectionStateListener implements
ConnectionListener {
case RECONNECTED:
logger.debug("registry connection state is {}, clean the node
info", state);
registryClient.remove(masterNodePath);
- registryClient.persistEphemeral(masterNodePath, "");
+ registryClient.persistEphemeral(masterNodePath,
masterHeartBeatTask.getHeartBeatInfo());
break;
case DISCONNECTED:
logger.warn("registry connection state is {}, ready to stop
myself", state);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 56167f65b4..f8ce97c0e3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -79,6 +79,11 @@ public class MasterRegistryClient implements AutoCloseable {
*/
private ScheduledExecutorService heartBeatExecutor;
+ /**
+ * master heartbeat task
+ */
+ private MasterHeartBeatTask masterHeartBeatTask;
+
/**
* master startup time, ms
*/
@@ -96,7 +101,7 @@ public class MasterRegistryClient implements AutoCloseable {
// master registry
registry();
registryClient.addConnectionStateListener(new
MasterConnectionStateListener(getCurrentNodePath(),
- registryClient));
+ registryClient, masterHeartBeatTask));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new
MasterRegistryDataListener());
} catch (Exception e) {
throw new RegistryException("Master registry client start up
error", e);
@@ -189,7 +194,7 @@ public class MasterRegistryClient implements AutoCloseable {
logger.info("Master node : {} registering to registry center",
masterAddress);
String localNodePath = getCurrentNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
- MasterHeartBeatTask heartBeatTask = new
MasterHeartBeatTask(startupTime,
+ masterHeartBeatTask = new MasterHeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
@@ -197,7 +202,7 @@ public class MasterRegistryClient implements AutoCloseable {
// remove before persist
registryClient.remove(localNodePath);
- registryClient.persistEphemeral(localNodePath,
heartBeatTask.getHeartBeatInfo());
+ registryClient.persistEphemeral(localNodePath,
masterHeartBeatTask.getHeartBeatInfo());
while (!registryClient.checkNodeExists(NetUtils.getHost(),
NodeType.MASTER)) {
logger.warn("The current master server node:{} cannot find in
registry", NetUtils.getHost());
@@ -210,7 +215,7 @@ public class MasterRegistryClient implements AutoCloseable {
// delete dead server
registryClient.handleDeadServer(Collections.singleton(localNodePath),
NodeType.MASTER, Constants.DELETE_OP);
- this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L,
masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
+ this.heartBeatExecutor.scheduleAtFixedRate(masterHeartBeatTask, 0L,
masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
logger.info("Master node : {} registered to registry center
successfully with heartBeatInterval : {}s", masterAddress,
masterHeartbeatInterval);
}