This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev_wenjun_coronationTask
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 9ef8d4bb051728b1843a0b7329593c06c6f1688b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Oct 28 17:32:47 2022 +0800

    Fix workerGroup will be updated by WorkerDataListener (#183)
---
 .../server/master/registry/ServerNodeManager.java             | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 6a758a934a..b8ed0abd6a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -224,20 +224,15 @@ public class ServerNodeManager implements 
InitializingBean {
                     final String workerAddress = parts[parts.length - 1];
 
                     if (type == Type.ADD) {
+                        // we don't update the workerGroupToAddressMap, this 
node will be addded to map aftre 10s
                         logger.info("worker group node : {} added.", path);
-                        Collection<String> currentNodes = 
registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        logger.info("currentNodes : {}", currentNodes);
-                        syncWorkerGroupNodes(workerGroupName, new 
HashSet<>(currentNodes));
                     } else if (type == Type.REMOVE) {
                         logger.info("worker group node : {} down.", path);
-                        Collection<String> currentNodes = 
registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, new 
HashSet<>(currentNodes));
+                        // Remove the node from workerNodeInfo, it will not 
receive task
+                        workerNodeInfo.remove(workerAddress);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                     } else if (type == Type.UPDATE) {
                         logger.debug("worker group node : {} update, data: 
{}", path, data);
-                        Collection<String> currentNodes = 
registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, new 
HashSet<>(currentNodes));
-
                         syncSingleWorkerNodeInfo(workerAddress, 
JSONUtils.parseObject(data, WorkerHeartBeat.class));
                     }
                 } catch (Exception ex) {

Reply via email to