This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new be3bd4c [DS-7004][MasterServer]fix master still work when it lose zk
connection (#7045)
be3bd4c is described below
commit be3bd4c83d30773ce6204bca08d3cd1e6444dea9
Author: wind <[email protected]>
AuthorDate: Tue Nov 30 11:00:17 2021 +0800
[DS-7004][MasterServer]fix master still work when it lose zk connection
(#7045)
Co-authored-by: caishunfeng <[email protected]>
---
.../master/registry/MasterRegistryClient.java | 49 +++++++++++++++-------
.../master/registry/MasterRegistryClientTest.java | 13 +++++-
2 files changed, 45 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ff8fbf4..7fd3731 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -290,20 +290,20 @@ public class MasterRegistryClient {
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (workerHost == null
- || !checkOwner
- ||
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+ || !checkOwner
+ ||
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
// only failover the task owned myself if worker down.
if (processInstance == null) {
logger.error("failover error, the process {} of task {} do
not exists.",
- taskInstance.getProcessInstanceId(),
taskInstance.getId());
+ taskInstance.getProcessInstanceId(),
taskInstance.getId());
continue;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
-
.buildTaskInstanceRelatedInfo(taskInstance)
-
.buildProcessInstanceRelatedInfo(processInstance)
-
.create();
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
@@ -358,23 +358,40 @@ public class MasterRegistryClient {
localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMaxCpuLoadAvg(),
- masterConfig.getReservedMemory(),
- Sets.newHashSet(getMasterPath()),
- Constants.MASTER_TYPE,
- registryClient);
+ masterConfig.getMaxCpuLoadAvg(),
+ masterConfig.getReservedMemory(),
+ Sets.newHashSet(getMasterPath()),
+ Constants.MASTER_TYPE,
+ registryClient);
registryClient.persistEphemeral(localNodePath,
heartBeatTask.getHeartBeatInfo());
- registryClient.addConnectionStateListener(newState -> {
- if (newState == ConnectionState.RECONNECTED || newState ==
ConnectionState.SUSPENDED) {
- registryClient.persistEphemeral(localNodePath, "");
- }
- });
+ registryClient.addConnectionStateListener(this::handleConnectionState);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
+ public void handleConnectionState(ConnectionState state) {
+ switch (state) {
+ case CONNECTED:
+ logger.debug("registry connection state is {}", state);
+ break;
+ case SUSPENDED:
+ logger.warn("registry connection state is {}, ready to stop
myself", state);
+ registryClient.getStoppable().stop("registry connection state
is SUSPENDED, stop myself");
+ break;
+ case RECONNECTED:
+ logger.debug("registry connection state is {}, clean the node
info", state);
+ registryClient.persistEphemeral(localNodePath, "");
+ break;
+ case DISCONNECTED:
+ logger.warn("registry connection state is {}, ready to stop
myself", state);
+ registryClient.getStoppable().stop("registry connection state
is DISCONNECTED, stop myself");
+ break;
+ default:
+ }
+ }
+
public void deregister() {
try {
String address = getLocalAddress();
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 97c45f1..89df27e 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
import
org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -49,7 +50,7 @@ import org.springframework.test.util.ReflectionTestUtils;
* MasterRegistryClientTest
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
+@PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"})
public class MasterRegistryClientTest {
@@ -76,6 +77,9 @@ public class MasterRegistryClientTest {
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
+ given(registryClient.getStoppable()).willReturn(cause -> {
+
+ });
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(),
Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient",
registryClient);
@@ -106,6 +110,13 @@ public class MasterRegistryClientTest {
}
@Test
+ public void handleConnectionStateTest() {
+ masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
+
masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
+ masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
+ }
+
+ @Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);