This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new be3bd4c  [DS-7004][MasterServer]fix master still work when it lose zk 
connection (#7045)
be3bd4c is described below

commit be3bd4c83d30773ce6204bca08d3cd1e6444dea9
Author: wind <[email protected]>
AuthorDate: Tue Nov 30 11:00:17 2021 +0800

    [DS-7004][MasterServer]fix master still work when it lose zk connection 
(#7045)
    
    Co-authored-by: caishunfeng <[email protected]>
---
 .../master/registry/MasterRegistryClient.java      | 49 +++++++++++++++-------
 .../master/registry/MasterRegistryClientTest.java  | 13 +++++-
 2 files changed, 45 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ff8fbf4..7fd3731 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -290,20 +290,20 @@ public class MasterRegistryClient {
 
             ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
             if (workerHost == null
-                || !checkOwner
-                || 
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+                    || !checkOwner
+                    || 
processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
                 // only failover the task owned myself if worker down.
                 if (processInstance == null) {
                     logger.error("failover error, the process {} of task {} do 
not exists.",
-                        taskInstance.getProcessInstanceId(), 
taskInstance.getId());
+                            taskInstance.getProcessInstanceId(), 
taskInstance.getId());
                     continue;
                 }
                 taskInstance.setProcessInstance(processInstance);
 
                 TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
-                                                                               
        .buildTaskInstanceRelatedInfo(taskInstance)
-                                                                               
        .buildProcessInstanceRelatedInfo(processInstance)
-                                                                               
        .create();
+                        .buildTaskInstanceRelatedInfo(taskInstance)
+                        .buildProcessInstanceRelatedInfo(processInstance)
+                        .create();
                 // only kill yarn job if exists , the local thread has exited
                 ProcessUtils.killYarnJob(taskExecutionContext);
 
@@ -358,23 +358,40 @@ public class MasterRegistryClient {
         localNodePath = getMasterPath();
         int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-            masterConfig.getMaxCpuLoadAvg(),
-            masterConfig.getReservedMemory(),
-            Sets.newHashSet(getMasterPath()),
-            Constants.MASTER_TYPE,
-            registryClient);
+                masterConfig.getMaxCpuLoadAvg(),
+                masterConfig.getReservedMemory(),
+                Sets.newHashSet(getMasterPath()),
+                Constants.MASTER_TYPE,
+                registryClient);
 
         registryClient.persistEphemeral(localNodePath, 
heartBeatTask.getHeartBeatInfo());
-        registryClient.addConnectionStateListener(newState -> {
-            if (newState == ConnectionState.RECONNECTED || newState == 
ConnectionState.SUSPENDED) {
-                registryClient.persistEphemeral(localNodePath, "");
-            }
-        });
+        registryClient.addConnectionStateListener(this::handleConnectionState);
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("master node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, masterHeartbeatInterval);
 
     }
 
+    public void handleConnectionState(ConnectionState state) {
+        switch (state) {
+            case CONNECTED:
+                logger.debug("registry connection state is {}", state);
+                break;
+            case SUSPENDED:
+                logger.warn("registry connection state is {}, ready to stop 
myself", state);
+                registryClient.getStoppable().stop("registry connection state 
is SUSPENDED, stop myself");
+                break;
+            case RECONNECTED:
+                logger.debug("registry connection state is {}, clean the node 
info", state);
+                registryClient.persistEphemeral(localNodePath, "");
+                break;
+            case DISCONNECTED:
+                logger.warn("registry connection state is {}, ready to stop 
myself", state);
+                registryClient.getStoppable().stop("registry connection state 
is DISCONNECTED, stop myself");
+                break;
+            default:
+        }
+    }
+
     public void deregister() {
         try {
             String address = getLocalAddress();
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 97c45f1..89df27e 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import 
org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -49,7 +50,7 @@ import org.springframework.test.util.ReflectionTestUtils;
  * MasterRegistryClientTest
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
+@PrepareForTest({RegistryClient.class})
 @PowerMockIgnore({"javax.management.*"})
 public class MasterRegistryClientTest {
 
@@ -76,6 +77,9 @@ public class MasterRegistryClientTest {
         given(registryClient.getLock(Mockito.anyString())).willReturn(true);
         
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
         
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
+        given(registryClient.getStoppable()).willReturn(cause -> {
+
+        });
         doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), 
Mockito.any(NodeType.class), Mockito.anyString());
         ReflectionTestUtils.setField(masterRegistryClient, "registryClient", 
registryClient);
 
@@ -106,6 +110,13 @@ public class MasterRegistryClientTest {
     }
 
     @Test
+    public void handleConnectionStateTest() {
+        masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
+        
masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
+        masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
+    }
+
+    @Test
     public void removeNodePathTest() {
         masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
         masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);

Reply via email to