This is an automated email from the ASF dual-hosted git repository. caishunfeng pushed a commit to branch 3.1.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 3e899bee0613e5cb4787feb53c3465f6220b25f6 Author: Yann Ann <[email protected]> AuthorDate: Sat Sep 24 18:57:20 2022 +0800 [fix#12000]Cannot remove the WorkerGroup from the master service (#12050) * [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000 * remove unnecessary locks * Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java Co-authored-by: caishunfeng <[email protected]> Co-authored-by: caishunfeng <[email protected]> --- .../server/master/registry/ServerNodeManager.java | 167 +++++++++++++-------- 1 file changed, 106 insertions(+), 61 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 147fff1364..ef9111cae7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -17,8 +17,9 @@ package org.apache.dolphinscheduler.server.master.registry; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; @@ -34,14 +35,12 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import javax.annotation.PreDestroy; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -57,8 +56,13 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; +import javax.annotation.PreDestroy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * server node manager @@ -74,16 +78,25 @@ public class ServerNodeManager implements InitializingBean { private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock(); private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock(); - private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock(); private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock(); /** - * worker group nodes, workerGroup -> ips + * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes */ private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>(); + /** + * worker group nodes from registry center, workerGroup -> ips + */ + private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>(); + + /** + * worker group nodes from db, workerGroup -> ips + */ + private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>(); + /** * master nodes */ @@ -138,7 +151,6 @@ public class ServerNodeManager implements InitializingBean { return MASTER_SIZE; } - /** * init listener * @@ -146,22 +158,20 @@ public class ServerNodeManager implements InitializingBean { */ @Override public void afterPropertiesSet() throws Exception { - /** - * load nodes from zookeeper - */ + + // load nodes from zookeeper load(); - /** - * init executor service - */ - executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); + + // init executor service + executorService = + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); + executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); - /* - * init MasterNodeListener listener - */ + + // init MasterNodeListener listener registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener()); - /* - * init WorkerNodeListener listener - */ + + // init WorkerNodeListener listener registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener()); } @@ -169,17 +179,14 @@ public class ServerNodeManager implements InitializingBean { * load nodes from zookeeper */ public void load() { - /* - * master nodes from zookeeper - */ + // master nodes from zookeeper updateMasterNodes(); - /* - * worker group nodes from zookeeper - */ + // worker group nodes from zookeeper Collection<String> workerGroups = registryClient.getWorkerGroupDirectly(); for (String workerGroup : workerGroups) { - syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup)); + syncWorkerGroupNodesFromRegistry(workerGroup, + registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD); } } @@ -191,29 +198,38 @@ public class ServerNodeManager implements InitializingBean { @Override public void run() { try { + dbWorkerGroupNodes.clear(); + // sync worker node info - Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true); + Map<String, String> registryWorkerNodeMap = registryClient + .getServerMaps(NodeType.WORKER, true); syncAllWorkerNodeInfo(registryWorkerNodeMap); // sync worker group nodes from database List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup(); if (CollectionUtils.isNotEmpty(workerGroupList)) { for (WorkerGroup wg : workerGroupList) { String workerGroupName = wg.getName(); - Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg); + Set<String> workerAddress = getWorkerAddressByWorkerGroup( + registryWorkerNodeMap, wg); if (!workerAddress.isEmpty()) { - syncWorkerGroupNodes(workerGroupName, workerAddress); + Set<String> workerNodes = dbWorkerGroupNodes + .getOrDefault(workerGroupName, new HashSet<>()); + workerNodes.clear(); + workerNodes.addAll(workerAddress); + dbWorkerGroupNodes.put(workerGroupName, workerNodes); } } } - notifyWorkerInfoChangeListeners(); } catch (Exception e) { logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e); + } finally { + refreshWorkerGroupNodes(); } } } - - protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) { + protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, + WorkerGroup wg) { Set<String> nodes = new HashSet<>(); String[] addrs = wg.getAddrList().split(Constants.COMMA); for (String addr : addrs) { @@ -238,29 +254,27 @@ public class ServerNodeManager implements InitializingBean { try { String[] parts = path.split("/"); if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { - throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); + throw new IllegalArgumentException( + String.format("worker group path : %s is not valid, ignore", path)); } final String workerGroupName = parts[parts.length - 2]; final String workerAddress = parts[parts.length - 1]; + logger.debug("received subscribe event : {}", event); + Collection<String> currentNodes = registryClient + .getWorkerGroupNodesDirectly(workerGroupName); + syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type); + if (type == Type.ADD) { - logger.info("worker group node : {} added.", path); - Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - logger.info("currentNodes : {}", currentNodes); - syncWorkerGroupNodes(workerGroupName, currentNodes); + logger.info("worker group node : {} added, currentNodes : {}", path, + currentNodes); } else if (type == Type.REMOVE) { logger.info("worker group node : {} down.", path); - Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - syncWorkerGroupNodes(workerGroupName, currentNodes); alertDao.sendServerStoppedAlert(1, path, "WORKER"); } else if (type == Type.UPDATE) { - logger.debug("worker group node : {} update, data: {}", path, data); - Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - syncWorkerGroupNodes(workerGroupName, currentNodes); - - syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class)); + syncSingleWorkerNodeInfo(workerAddress, + JSONUtils.parseObject(data, WorkerHeartBeat.class)); } - notifyWorkerInfoChangeListeners(); } catch (IllegalArgumentException ex) { logger.warn(ex.getMessage()); } catch (Exception ex) { @@ -272,6 +286,7 @@ public class ServerNodeManager implements InitializingBean { } class MasterDataListener implements SubscribeListener { + @Override public void notify(Event event) { final String path = event.path(); @@ -328,28 +343,57 @@ public class ServerNodeManager implements InitializingBean { MASTER_SIZE = nodes.size(); MASTER_SLOT = index; } else { - logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress()); + logger.warn("current addr:{} is not in active master list", + masterConfig.getMasterAddress()); } - logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress()); + logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, + MASTER_SLOT, masterConfig.getMasterAddress()); } finally { masterLock.unlock(); } } /** - * sync worker group nodes + * sync worker group nodes from registry center * * @param workerGroup worker group * @param nodes worker nodes + * @param type event type + */ + private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes, + Type type) { + try { + if (type == Type.REMOVE) { + if (!registryWorkerGroupNodes.containsKey(workerGroup)) { + logger.warn("cannot remove worker group {}, not in active list", workerGroup); + return; + } + registryWorkerGroupNodes.remove(workerGroup); + } else { + Set<String> workerNodes = registryWorkerGroupNodes + .getOrDefault(workerGroup, new HashSet<>()); + workerNodes.clear(); + workerNodes.addAll(nodes); + registryWorkerGroupNodes.put(workerGroup, workerNodes); + } + } finally { + refreshWorkerGroupNodes(); + } + } + + /** + * refresh worker group nodes */ - private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) { + private void refreshWorkerGroupNodes() { workerGroupWriteLock.lock(); try { - Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>()); - workerNodes.clear(); - workerNodes.addAll(nodes); - workerGroupNodes.put(workerGroup, workerNodes); + workerGroupNodes.clear(); + workerGroupNodes.putAll(registryWorkerGroupNodes); + workerGroupNodes.putAll(dbWorkerGroupNodes); + logger.debug("refresh worker group nodes, current list: {}", + Arrays.toString(workerGroupNodes.keySet().toArray())); } finally { + notifyWorkerInfoChangeListeners(); workerGroupWriteLock.unlock(); } } @@ -414,7 +458,8 @@ public class ServerNodeManager implements InitializingBean { try { workerNodeInfo.clear(); for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) { - workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); + workerNodeInfo.put(entry.getKey(), + JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); } } finally { workerNodeInfoWriteLock.unlock();
