This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 73e846d03e [Improvement-14884][Master] Add overload state in master 
heartbeat to trigger slot change (#14887)
73e846d03e is described below

commit 73e846d03ebb499cf71d08267ca82c727ae8f21e
Author: Aaron Wang <[email protected]>
AuthorDate: Thu Sep 14 10:36:42 2023 +0800

    [Improvement-14884][Master] Add overload state in master heartbeat to 
trigger slot change (#14887)
---
 .../common/constants/Constants.java                |   4 -
 .../ServerStatus.java}                             |  24 +----
 .../common/model/MasterHeartBeat.java              |   3 +
 .../common/model/WorkerHeartBeat.java              |   4 +-
 .../dispatch/host/LowerWeightHostManager.java      |   6 +-
 .../master/registry/MasterInfoChangeListener.java  |  33 +++---
 .../server/master/registry/MasterSlotManager.java  | 116 +++++++++++++++++++++
 .../server/master/registry/ServerNodeManager.java  |  98 +++++++----------
 .../master/runner/MasterSchedulerBootstrap.java    |   8 +-
 .../runner/WorkflowExecuteContextFactory.java      |   8 +-
 .../server/master/task/MasterHeartBeatTask.java    |   8 ++
 .../master/registry/MasterSlotManagerTest.java     |  94 +++++++++++++++++
 .../service/queue/MasterPriorityQueue.java         |   4 +-
 .../server/worker/task/WorkerHeartBeatTask.java    |  22 ++--
 14 files changed, 299 insertions(+), 133 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index 5769126659..9ec208e44c 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -652,10 +652,6 @@ public final class Constants {
      */
     public static final int AUTHORIZE_READABLE_PERM = 4;
 
-    public static final int NORMAL_NODE_STATUS = 0;
-    public static final int ABNORMAL_NODE_STATUS = 1;
-    public static final int BUSY_NODE_STATUE = 2;
-
     public static final String START_TIME = "start time";
     public static final String END_TIME = "end time";
     public static final String START_END_DATE = "startDate,endDate";
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
similarity index 58%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
copy to 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
index 52c96defc6..16a0f0e34c 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
@@ -15,28 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.model;
+package org.apache.dolphinscheduler.common.enums;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+public enum ServerStatus {
 
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class MasterHeartBeat implements HeartBeat {
+    NORMAL, ABNORMAL, BUSY
 
-    private long startupTime;
-    private long reportTime;
-    private double cpuUsage;
-    private double memoryUsage;
-    private double availablePhysicalMemorySize;
-    private double reservedMemory;
-    private double diskAvailable;
-    private int processId;
-
-    private String host;
-    private int port;
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
index 52c96defc6..6386f32ae6 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.common.model;
 
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -36,6 +38,7 @@ public class MasterHeartBeat implements HeartBeat {
     private double reservedMemory;
     private double diskAvailable;
     private int processId;
+    private ServerStatus serverStatus;
 
     private String host;
     private int port;
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
index d3843d2783..396f227f47 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.common.model;
 
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -36,7 +38,7 @@ public class WorkerHeartBeat implements HeartBeat {
     private double availablePhysicalMemorySize;
     private double reservedMemory;
     private double diskAvailable;
-    private int serverStatus;
+    private ServerStatus serverStatus;
     private int processId;
 
     private String host;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index a6cb9352a8..607b78abbc 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
@@ -135,12 +135,12 @@ public class LowerWeightHostManager extends 
CommonHostManager {
             log.warn("worker {} in work group {} have not received the 
heartbeat", addr, workerGroup);
             return Optional.empty();
         }
-        if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
+        if (ServerStatus.ABNORMAL == heartBeat.getServerStatus()) {
             log.warn("worker {} current cpu load average {} is too high or 
available memory {}G is too low",
                     addr, heartBeat.getLoadAverage(), 
heartBeat.getAvailablePhysicalMemorySize());
             return Optional.empty();
         }
-        if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
+        if (ServerStatus.BUSY == heartBeat.getServerStatus()) {
             log.warn("worker {} is busy, current waiting task count {} is 
large than worker thread count {}",
                     addr, heartBeat.getWorkerWaitingTaskCount(), 
heartBeat.getWorkerExecThreadCount());
             return Optional.empty();
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java
similarity index 58%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java
index 52c96defc6..ffd7628923 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java
@@ -15,28 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.model;
+package org.apache.dolphinscheduler.server.master.registry;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
 
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class MasterHeartBeat implements HeartBeat {
+import java.util.Map;
 
-    private long startupTime;
-    private long reportTime;
-    private double cpuUsage;
-    private double memoryUsage;
-    private double availablePhysicalMemorySize;
-    private double reservedMemory;
-    private double diskAvailable;
-    private int processId;
+/**
+ * The listener used in {@link ServerNodeManager} to notify the change of 
master info.
+ */
+public interface MasterInfoChangeListener {
 
-    private String host;
-    private int port;
+    /**
+     * Used to notify the change of master info.
+     *
+     * @param masterNodeInfo master node info map, key is master address, 
value is master info.
+     */
+    void notify(Map<String, MasterHeartBeat> masterNodeInfo);
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
new file mode 100644
index 0000000000..5fb0c74f5a
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class MasterSlotManager {
+
+    @Autowired
+    protected ServerNodeManager serverNodeManager;
+
+    @Autowired
+    protected MasterConfig masterConfig;
+
+    private volatile int currentSlot = 0;
+    private volatile int totalSlot = 0;
+
+    @PostConstruct
+    public void init() {
+        serverNodeManager.addMasterInfoChangeListener(new 
MasterSlotManager.SlotChangeListener());
+    }
+
+    public int getSlot() {
+        return currentSlot;
+    }
+
+    public int getMasterSize() {
+        return totalSlot;
+    }
+
+    public class SlotChangeListener implements MasterInfoChangeListener {
+
+        private final Lock slotLock = new ReentrantLock();
+
+        private final MasterPriorityQueue masterPriorityQueue = new 
MasterPriorityQueue();
+
+        @Override
+        public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
+            List<Server> serverList = masterNodeInfo.values().stream()
+                    .filter(heartBeat -> 
!heartBeat.getServerStatus().equals(ServerStatus.ABNORMAL))
+                    
.map(this::convertHeartBeatToServer).collect(Collectors.toList());
+            syncMasterNodes(serverList);
+        }
+
+        /**
+         * sync master nodes
+         */
+        private void syncMasterNodes(List<Server> masterNodes) {
+            slotLock.lock();
+            try {
+                this.masterPriorityQueue.clear();
+                this.masterPriorityQueue.putAll(masterNodes);
+                int tempCurrentSlot = 
masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
+                int tempTotalSlot = masterNodes.size();
+                if (tempCurrentSlot < 0) {
+                    totalSlot = 0;
+                    currentSlot = 0;
+                    log.warn("Current master is not in active master list");
+                } else if (tempCurrentSlot != currentSlot || tempTotalSlot != 
totalSlot) {
+                    totalSlot = tempTotalSlot;
+                    currentSlot = tempCurrentSlot;
+                    log.info("Update master nodes, total master size: {}, 
current slot: {}", totalSlot, currentSlot);
+                }
+            } finally {
+                slotLock.unlock();
+            }
+        }
+
+        private Server convertHeartBeatToServer(MasterHeartBeat 
masterHeartBeat) {
+            Server server = new Server();
+            server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
+            server.setLastHeartbeatTime(new 
Date(masterHeartBeat.getReportTime()));
+            server.setId(masterHeartBeat.getProcessId());
+            server.setHost(masterHeartBeat.getHost());
+            server.setPort(masterHeartBeat.getPort());
+
+            return server;
+        }
+
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 852f73ce73..feee60a171 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.server.master.registry;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
@@ -32,7 +32,6 @@ import 
org.apache.dolphinscheduler.registry.api.SubscribeListener;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
-import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.ArrayUtils;
@@ -40,10 +39,8 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -52,7 +49,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
@@ -69,8 +65,6 @@ import org.springframework.stereotype.Service;
 @Slf4j
 public class ServerNodeManager implements InitializingBean {
 
-    private final Lock masterLock = new ReentrantLock();
-
     private final ReentrantReadWriteLock workerGroupLock = new 
ReentrantReadWriteLock();
     private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = 
workerGroupLock.readLock();
     private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = 
workerGroupLock.writeLock();
@@ -79,12 +73,14 @@ public class ServerNodeManager implements InitializingBean {
     private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = 
workerNodeInfoLock.readLock();
     private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = 
workerNodeInfoLock.writeLock();
 
+    private final ReentrantLock masterNodeInfoLock = new ReentrantLock();
+
     /**
      * worker group nodes, workerGroup -> ips, combining 
registryWorkerGroupNodes and dbWorkerGroupNodes
      */
     private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = 
new ConcurrentHashMap<>();
 
-    private final Set<String> masterNodes = new HashSet<>();
+    private final Map<String, MasterHeartBeat> masterNodeInfo = new 
HashMap<>();
 
     private final Map<String, WorkerHeartBeat> workerNodeInfo = new 
HashMap<>();
 
@@ -99,8 +95,6 @@ public class ServerNodeManager implements InitializingBean {
     @Autowired
     private WorkerGroupMapper workerGroupMapper;
 
-    private final MasterPriorityQueue masterPriorityQueue = new 
MasterPriorityQueue();
-
     @Autowired
     private AlertDao alertDao;
 
@@ -109,24 +103,13 @@ public class ServerNodeManager implements 
InitializingBean {
 
     private final List<WorkerInfoChangeListener> workerInfoChangeListeners = 
new ArrayList<>();
 
-    private volatile int currentSlot = 0;
-
-    private volatile int totalSlot = 0;
-
-    public int getSlot() {
-        return currentSlot;
-    }
-
-    public int getMasterSize() {
-        return totalSlot;
-    }
+    private final List<MasterInfoChangeListener> masterInfoChangeListeners = 
new ArrayList<>();
 
     @Override
     public void afterPropertiesSet() {
 
         // load nodes from zookeeper
-        updateMasterNodes();
-        refreshWorkerNodesAndGroupMappings();
+        refreshNodesAndGroupMappings();
 
         // init executor service
         executorService =
@@ -147,7 +130,7 @@ public class ServerNodeManager implements InitializingBean {
         public void run() {
             try {
                 // sync worker node info
-                refreshWorkerNodesAndGroupMappings();
+                refreshNodesAndGroupMappings();
             } catch (Exception e) {
                 log.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
             }
@@ -155,12 +138,15 @@ public class ServerNodeManager implements 
InitializingBean {
     }
 
     /**
-     * Refresh worker nodes and worker group mapping information
+     * Refresh master/worker nodes and worker group mapping information
      */
-    private void refreshWorkerNodesAndGroupMappings() {
+    private void refreshNodesAndGroupMappings() {
         updateWorkerNodes();
         updateWorkerGroupMappings();
         notifyWorkerInfoChangeListeners();
+
+        updateMasterNodes();
+        notifyMasterInfoChangeListeners();
     }
 
     /**
@@ -214,11 +200,8 @@ public class ServerNodeManager implements InitializingBean 
{
                 try {
                     if (type.equals(Type.ADD)) {
                         log.info("master node : {} added.", path);
-                        updateMasterNodes();
-                    }
-                    if (type.equals(Type.REMOVE)) {
+                    } else if (type.equals(Type.REMOVE)) {
                         log.info("master node : {} down.", path);
-                        updateMasterNodes();
                         alertDao.sendServerStoppedAlert(1, path, "MASTER");
                     }
                 } catch (Exception ex) {
@@ -229,19 +212,17 @@ public class ServerNodeManager implements 
InitializingBean {
     }
 
     private void updateMasterNodes() {
-        currentSlot = 0;
-        totalSlot = 0;
-        this.masterNodes.clear();
-        String nodeLock = RegistryNodeType.MASTER_NODE_LOCK.getRegistryPath();
+        masterNodeInfoLock.lock();
         try {
-            registryClient.getLock(nodeLock);
-            Collection<String> currentNodes = 
registryClient.getMasterNodesDirectly();
-            List<Server> masterNodeList = 
registryClient.getServerList(RegistryNodeType.MASTER);
-            syncMasterNodes(currentNodes, masterNodeList);
+            masterNodeInfo.clear();
+            Map<String, String> masterNodeMaps = 
registryClient.getServerMaps(RegistryNodeType.MASTER);
+            for (Map.Entry<String, String> entry : masterNodeMaps.entrySet()) {
+                masterNodeInfo.put(entry.getKey(), 
JSONUtils.parseObject(entry.getValue(), MasterHeartBeat.class));
+            }
         } catch (Exception e) {
             log.error("update master nodes error", e);
         } finally {
-            registryClient.releaseLock(nodeLock);
+            masterNodeInfoLock.unlock();
         }
 
     }
@@ -289,30 +270,6 @@ public class ServerNodeManager implements InitializingBean 
{
         }
     }
 
-    /**
-     * sync master nodes
-     *
-     * @param nodes master nodes
-     */
-    private void syncMasterNodes(Collection<String> nodes, List<Server> 
masterNodes) {
-        masterLock.lock();
-        try {
-            this.masterNodes.addAll(nodes);
-            this.masterPriorityQueue.clear();
-            this.masterPriorityQueue.putList(masterNodes);
-            int index = 
masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
-            if (index >= 0) {
-                totalSlot = nodes.size();
-                currentSlot = index;
-            } else {
-                log.warn("Current master is not in active master list");
-            }
-            log.info("Update master nodes, total master size: {}, current 
slot: {}", totalSlot, currentSlot);
-        } finally {
-            masterLock.unlock();
-        }
-    }
-
     public Map<String, Set<String>> getWorkerGroupNodes() {
         workerGroupReadLock.lock();
         try {
@@ -360,6 +317,10 @@ public class ServerNodeManager implements InitializingBean 
{
         }
     }
 
+    public Map<String, MasterHeartBeat> getMasterNodeInfo() {
+        return Collections.unmodifiableMap(masterNodeInfo);
+    }
+
     /**
      * Add the resource change listener, when the resource changed, the 
listener will be notified.
      *
@@ -377,6 +338,17 @@ public class ServerNodeManager implements InitializingBean 
{
         }
     }
 
+    public synchronized void 
addMasterInfoChangeListener(MasterInfoChangeListener listener) {
+        masterInfoChangeListeners.add(listener);
+    }
+
+    private void notifyMasterInfoChangeListeners() {
+        Map<String, MasterHeartBeat> masterNodeInfoMap = getMasterNodeInfo();
+        for (MasterInfoChangeListener listener : masterInfoChangeListeners) {
+            listener.notify(masterNodeInfoMap);
+        }
+    }
+
     @PreDestroy
     public void destroy() {
         executorService.shutdownNow();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 0313e2b4a5..4d84644cb0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -33,7 +33,7 @@ import 
org.apache.dolphinscheduler.server.master.exception.MasterException;
 import 
org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
 import org.apache.dolphinscheduler.service.command.CommandService;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -73,7 +73,7 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
     private WorkflowEventLooper workflowEventLooper;
 
     @Autowired
-    private ServerNodeManager serverNodeManager;
+    private MasterSlotManager masterSlotManager;
 
     @Autowired
     private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;
@@ -171,8 +171,8 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
     private List<Command> findCommands() throws MasterException {
         try {
             long scheduleStartTime = System.currentTimeMillis();
-            int thisMasterSlot = serverNodeManager.getSlot();
-            int masterCount = serverNodeManager.getMasterSize();
+            int thisMasterSlot = masterSlotManager.getSlot();
+            int masterCount = masterSlotManager.getMasterSize();
             if (masterCount <= 0) {
                 log.warn("Master count: {} is invalid, the current slot: {}", 
masterCount, thisMasterSlot);
                 return Collections.emptyList();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
index 0578176e5e..89810aea1f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
@@ -25,7 +25,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
 import org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory;
 import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
 public class WorkflowExecuteContextFactory {
 
     @Autowired
-    private ServerNodeManager serverNodeManager;
+    private MasterSlotManager masterSlotManager;
 
     @Autowired
     private ProcessService processService;
@@ -85,8 +85,8 @@ public class WorkflowExecuteContextFactory {
     }
 
     private SlotCheckState slotCheck(Command command) {
-        int slot = serverNodeManager.getSlot();
-        int masterSize = serverNodeManager.getMasterSize();
+        int slot = masterSlotManager.getSlot();
+        int masterSize = masterSlotManager.getMasterSize();
         SlotCheckState state;
         if (masterSize <= 0) {
             state = SlotCheckState.CHANGE;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
index 824d4778ab..f8a9b30e28 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.task;
 
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
 import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
@@ -60,6 +61,7 @@ public class MasterHeartBeatTask extends 
BaseHeartBeatTask<MasterHeartBeat> {
                 .memoryUsage(OSUtils.memoryUsagePercentage())
                 .diskAvailable(OSUtils.diskAvailable())
                 .processId(processId)
+                .serverStatus(getServerStatus())
                 .host(NetUtils.getHost())
                 .port(masterConfig.getListenPort())
                 .build();
@@ -72,4 +74,10 @@ public class MasterHeartBeatTask extends 
BaseHeartBeatTask<MasterHeartBeat> {
         log.debug("Success write master heartBeatInfo into registry, 
masterRegistryPath: {}, heartBeatInfo: {}",
                 heartBeatPath, masterHeartBeatJson);
     }
+
+    private ServerStatus getServerStatus() {
+        return OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), 
masterConfig.getReservedMemory())
+                ? ServerStatus.ABNORMAL
+                : ServerStatus.NORMAL;
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
new file mode 100644
index 0000000000..4ee75a0392
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class MasterSlotManagerTest {
+
+    @InjectMocks
+    private MasterSlotManager masterSlotManager = Mockito.spy(new 
MasterSlotManager());
+
+    @Mock
+    private MasterConfig masterConfig;
+
+    @Test
+    void testNormalMasterSlots() {
+        // on normal Master side
+        
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:7777");
+
+        sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL);
+        Assertions.assertEquals(1, masterSlotManager.getMasterSize());
+        Assertions.assertEquals(0, masterSlotManager.getSlot());
+
+        sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL);
+        Assertions.assertEquals(2, masterSlotManager.getMasterSize());
+        Assertions.assertEquals(1, masterSlotManager.getSlot());
+    }
+
+    @Test
+    void testOverloadMasterSlots() {
+        // on abnormal Master side
+        
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:6666");
+
+        sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL);
+        Assertions.assertEquals(0, masterSlotManager.getMasterSize());
+        Assertions.assertEquals(0, masterSlotManager.getSlot());
+
+        sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL);
+        Assertions.assertEquals(2, masterSlotManager.getMasterSize());
+        Assertions.assertEquals(0, masterSlotManager.getSlot());
+    }
+
+    public void sendHeartBeat(ServerStatus serverStatus1, ServerStatus 
serverStatus2) {
+        MasterSlotManager.SlotChangeListener slotChangeListener = 
masterSlotManager.new SlotChangeListener();
+
+        Map<String, MasterHeartBeat> masterNodeInfo = new HashMap<>();
+        // generate heartbeat
+        MasterHeartBeat masterHeartBeat1 = MasterHeartBeat.builder()
+                .startupTime(System.currentTimeMillis())
+                .serverStatus(serverStatus1)
+                .host("127.0.0.1")
+                .port(6666)
+                .build();
+        MasterHeartBeat masterHeartBeat2 = MasterHeartBeat.builder()
+                .startupTime(System.currentTimeMillis())
+                .serverStatus(serverStatus2)
+                .host("127.0.0.1")
+                .port(7777)
+                .build();
+        masterNodeInfo.put("127.0.0.1:6666", masterHeartBeat1);
+        masterNodeInfo.put("127.0.0.1:7777", masterHeartBeat2);
+
+        slotChangeListener.notify(masterNodeInfo);
+    }
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
index e5405489e3..d4fe74cd9f 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
@@ -20,10 +20,10 @@ package org.apache.dolphinscheduler.service.queue;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -62,7 +62,7 @@ public class MasterPriorityQueue implements 
TaskPriorityQueue<Server> {
         return queue.size();
     }
 
-    public void putList(List<Server> serverList) {
+    public void putAll(Collection<Server> serverList) {
         for (Server server : serverList) {
             this.queue.put(server);
         }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index bb9c77e849..1294163e1e 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.task;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
@@ -59,7 +59,7 @@ public class WorkerHeartBeatTask extends 
BaseHeartBeatTask<WorkerHeartBeat> {
         double reservedMemory = workerConfig.getReservedMemory();
         double memoryUsagePercentage = OSUtils.memoryUsagePercentage();
         int execThreads = workerConfig.getExecThreads();
-        int serverStatus =
+        ServerStatus serverStatus =
                 getServerStatus(cpuUsagePercentage, maxCpuUsePercentage, 
memoryUsagePercentage, reservedMemory,
                         execThreads, this.workerWaitingTaskCount.get());
 
@@ -91,23 +91,23 @@ public class WorkerHeartBeatTask extends 
BaseHeartBeatTask<WorkerHeartBeat> {
                 workerRegistryPath, workerHeartBeatJson);
     }
 
-    public int getServerStatus(double cpuUsagePercentage,
-                               double maxCpuUsePercentage,
-                               double memoryUsagePercentage,
-                               double reservedMemory,
-                               int workerExecThreadCount,
-                               int workerWaitingTaskCount) {
+    private ServerStatus getServerStatus(double cpuUsagePercentage,
+                                         double maxCpuUsePercentage,
+                                         double memoryUsagePercentage,
+                                         double reservedMemory,
+                                         int workerExecThreadCount,
+                                         int workerWaitingTaskCount) {
         if (cpuUsagePercentage > maxCpuUsePercentage || (1 - 
memoryUsagePercentage) < reservedMemory) {
             log.warn(
                     "current cpu load average {} is higher than {} or 
available memory {} is lower than {}",
                     cpuUsagePercentage, maxCpuUsePercentage, 1 - 
memoryUsagePercentage, reservedMemory);
-            return Constants.ABNORMAL_NODE_STATUS;
+            return ServerStatus.ABNORMAL;
         } else if (workerWaitingTaskCount > workerExecThreadCount) {
             log.warn("current waiting task count {} is large than worker 
thread count {}, worker is busy",
                     workerWaitingTaskCount, workerExecThreadCount);
-            return Constants.BUSY_NODE_STATUE;
+            return ServerStatus.BUSY;
         } else {
-            return Constants.NORMAL_NODE_STATUS;
+            return ServerStatus.NORMAL;
         }
     }
 }


Reply via email to