This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.1-prepare by this push:
new 19cb123 [cherry-pick][DS-7004][MasterServer]fix master still work
when it lose zk connection (#7058)
19cb123 is described below
commit 19cb1237b36d3de0d5ae29ee99142ef28fbe8242
Author: wind <[email protected]>
AuthorDate: Tue Nov 30 11:55:38 2021 +0800
[cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk
connection (#7058)
---
.../master/registry/MasterRegistryClient.java | 39 ++++++++++++++++------
.../master/registry/MasterRegistryClientTest.java | 13 +++++++-
2 files changed, 40 insertions(+), 12 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ee6349a..318a06f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -290,20 +290,20 @@ public class MasterRegistryClient {
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (workerHost == null
- || !checkOwner
- ||
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+ || !checkOwner
+ ||
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
// only failover the task owned myself if worker down.
if (processInstance == null) {
logger.error("failover error, the process {} of task {} do
not exists.",
- taskInstance.getProcessInstanceId(),
taskInstance.getId());
+ taskInstance.getProcessInstanceId(),
taskInstance.getId());
continue;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
-
.buildTaskInstanceRelatedInfo(taskInstance)
-
.buildProcessInstanceRelatedInfo(processInstance)
-
.create();
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
@@ -364,16 +364,33 @@ public class MasterRegistryClient {
registryClient);
registryClient.persistEphemeral(localNodePath,
heartBeatTask.getHeartBeatInfo());
- registryClient.addConnectionStateListener(newState -> {
- if (newState == ConnectionState.RECONNECTED || newState ==
ConnectionState.SUSPENDED) {
- registryClient.persistEphemeral(localNodePath, "");
- }
- });
+ registryClient.addConnectionStateListener(this::handleConnectionState);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
+ public void handleConnectionState(ConnectionState state) {
+ switch (state) {
+ case CONNECTED:
+ logger.debug("registry connection state is {}", state);
+ break;
+ case SUSPENDED:
+ logger.warn("registry connection state is {}, ready to stop
myself", state);
+ registryClient.getStoppable().stop("registry connection state
is SUSPENDED, stop myself");
+ break;
+ case RECONNECTED:
+ logger.debug("registry connection state is {}, clean the node
info", state);
+ registryClient.persistEphemeral(localNodePath, "");
+ break;
+ case DISCONNECTED:
+ logger.warn("registry connection state is {}, ready to stop
myself", state);
+ registryClient.getStoppable().stop("registry connection state
is DISCONNECTED, stop myself");
+ break;
+ default:
+ }
+ }
+
public void deregister() {
try {
String address = getLocalAddress();
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index c643818..65d6b89 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -48,7 +49,7 @@ import org.springframework.test.util.ReflectionTestUtils;
* MasterRegistryClientTest
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
+@PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"})
public class MasterRegistryClientTest {
@@ -72,6 +73,9 @@ public class MasterRegistryClientTest {
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
+ given(registryClient.getStoppable()).willReturn(cause -> {
+
+ });
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(),
Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient",
registryClient);
@@ -102,6 +106,13 @@ public class MasterRegistryClientTest {
}
@Test
+ public void handleConnectionStateTest() {
+ masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
+
masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
+ masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
+ }
+
+ @Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);