This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch dev_wenjun_coronationTask in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 9ef8d4bb051728b1843a0b7329593c06c6f1688b Author: Wenjun Ruan <[email protected]> AuthorDate: Fri Oct 28 17:32:47 2022 +0800 Fix workerGroup will be updated by WorkerDataListener (#183) --- .../server/master/registry/ServerNodeManager.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 6a758a934a..b8ed0abd6a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -224,20 +224,15 @@ public class ServerNodeManager implements InitializingBean { final String workerAddress = parts[parts.length - 1]; if (type == Type.ADD) { + // we don't update the workerGroupToAddressMap, this node will be addded to map aftre 10s logger.info("worker group node : {} added.", path); - Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - logger.info("currentNodes : {}", currentNodes); - syncWorkerGroupNodes(workerGroupName, new HashSet<>(currentNodes)); } else if (type == Type.REMOVE) { logger.info("worker group node : {} down.", path); - Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - syncWorkerGroupNodes(workerGroupName, new HashSet<>(currentNodes)); + // Remove the node from workerNodeInfo, it will not receive task + workerNodeInfo.remove(workerAddress); alertDao.sendServerStoppedAlert(1, path, "WORKER"); } else if (type == Type.UPDATE) { logger.debug("worker group node : {} update, data: {}", path, data); - Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - syncWorkerGroupNodes(workerGroupName, new HashSet<>(currentNodes)); - syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class)); } } catch (Exception ex) {
