This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 431414729f 【Master】【Bug】Fix a bug,  When the worker service offline, 
workerNodeInfo cache in master cannot delete the offline worker (#15459)
431414729f is described below

commit 431414729fc9a75ba04a6dc01a196f4dd3660557
Author: sleo <[email protected]>
AuthorDate: Thu Jan 25 18:10:36 2024 +0800

    【Master】【Bug】Fix a bug,  When the worker service offline, workerNodeInfo 
cache in master cannot delete the offline worker (#15459)
    
    * remove work from workerNodeInfo when work server down
    
    * modify log msg
    
    Co-authored-by: xiangzihao <[email protected]>
    
    * add unit test
    
    * add unit test
    
    * modify the code style
    
    * add license header
    
    ---------
    
    Co-authored-by: xiangzihao <[email protected]>
    Co-authored-by: Rick Cheng <[email protected]>
---
 .../server/master/registry/ServerNodeManager.java  | 11 +++
 .../master/registry/ServerNodeManagerTest.java     | 93 ++++++++++++++++++++++
 2 files changed, 104 insertions(+)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 7c7095971f..11a994aacd 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -174,6 +174,7 @@ public class ServerNodeManager implements InitializingBean {
                         log.info("Worker: {} added, currentNode : {}", path, 
workerAddress);
                     } else if (type == Type.REMOVE) {
                         log.info("Worker node : {} down.", path);
+                        removeSingleWorkerNode(workerAddress);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                         
listenerEventAlertManager.publishServerDownListenerEvent(path, "WORKER");
                     } else if (type == Type.UPDATE) {
@@ -193,6 +194,16 @@ public class ServerNodeManager implements InitializingBean 
{
                 workerNodeInfoWriteLock.unlock();
             }
         }
+
+        private void removeSingleWorkerNode(String workerAddress) {
+            workerNodeInfoWriteLock.lock();
+            try {
+                workerNodeInfo.remove(workerAddress);
+                log.info("remove worker node {} from workerNodeInfo when 
worker server down", workerAddress);
+            } finally {
+                workerNodeInfoWriteLock.unlock();
+            }
+        }
     }
 
     class MasterDataListener implements SubscribeListener {
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java
new file mode 100644
index 0000000000..fd9e81549f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+public class ServerNodeManagerTest {
+
+    @Mock
+    RegistryClient registryClient;
+
+    @Mock
+    AlertDao alertDao;
+
+    @Mock
+    ListenerEventAlertManager listenerEventAlertManager;
+
+    @InjectMocks
+    ServerNodeManager serverNodeManager;
+
+    @Test
+    public void updateWorkerNodesTest() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+
+        MockitoAnnotations.initMocks(this);
+        HashMap<String, String> workerNodeMaps = new HashMap<>();
+        workerNodeMaps.put("worker-node-1", JSONUtils.toJsonString(new 
WorkerHeartBeat()));
+        workerNodeMaps.put("worker-node-2", JSONUtils.toJsonString(new 
WorkerHeartBeat()));
+
+        
Mockito.when(registryClient.getServerMaps(Mockito.any())).thenReturn(workerNodeMaps);
+        
Mockito.when(registryClient.isWorkerPath(Mockito.anyString())).thenReturn(true);
+
+        // two worker server running (worker-node-1, worker-node-2)
+        Method updateWorkerNodes = 
serverNodeManager.getClass().getDeclaredMethod("updateWorkerNodes");
+        updateWorkerNodes.setAccessible(true);
+        updateWorkerNodes.invoke(serverNodeManager);
+
+        Map<String, WorkerHeartBeat> workerNodeInfo = 
serverNodeManager.getWorkerNodeInfo();
+        Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-1"));
+        Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-2"));
+
+        // receive remove event when worker-node-1 server stop
+        ServerNodeManager.WorkerDataListener workerDataListener = 
serverNodeManager.new WorkerDataListener();
+        Event event = new Event("", "/nodes/worker/worker-node-1", "", 
Event.Type.REMOVE);
+        workerDataListener.notify(event);
+
+        // check worker-node-1 not exist in cache
+        workerNodeInfo = serverNodeManager.getWorkerNodeInfo();
+        Assertions.assertFalse(workerNodeInfo.containsKey("worker-node-1"));
+        Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-2"));
+
+        // worker-node-1 restart, getServerMaps(RegistryNodeType.WORKER) 
method return two worker
+        updateWorkerNodes.invoke(serverNodeManager);
+
+        // check cache
+        workerNodeInfo = serverNodeManager.getWorkerNodeInfo();
+        Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-1"));
+        Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-2"));
+
+    }
+
+}

Reply via email to