This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 73e846d03e [Improvement-14884][Master] Add overload state in master
heartbeat to trigger slot change (#14887)
73e846d03e is described below
commit 73e846d03ebb499cf71d08267ca82c727ae8f21e
Author: Aaron Wang <[email protected]>
AuthorDate: Thu Sep 14 10:36:42 2023 +0800
[Improvement-14884][Master] Add overload state in master heartbeat to
trigger slot change (#14887)
---
.../common/constants/Constants.java | 4 -
.../ServerStatus.java} | 24 +----
.../common/model/MasterHeartBeat.java | 3 +
.../common/model/WorkerHeartBeat.java | 4 +-
.../dispatch/host/LowerWeightHostManager.java | 6 +-
.../master/registry/MasterInfoChangeListener.java | 33 +++---
.../server/master/registry/MasterSlotManager.java | 116 +++++++++++++++++++++
.../server/master/registry/ServerNodeManager.java | 98 +++++++----------
.../master/runner/MasterSchedulerBootstrap.java | 8 +-
.../runner/WorkflowExecuteContextFactory.java | 8 +-
.../server/master/task/MasterHeartBeatTask.java | 8 ++
.../master/registry/MasterSlotManagerTest.java | 94 +++++++++++++++++
.../service/queue/MasterPriorityQueue.java | 4 +-
.../server/worker/task/WorkerHeartBeatTask.java | 22 ++--
14 files changed, 299 insertions(+), 133 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index 5769126659..9ec208e44c 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -652,10 +652,6 @@ public final class Constants {
*/
public static final int AUTHORIZE_READABLE_PERM = 4;
- public static final int NORMAL_NODE_STATUS = 0;
- public static final int ABNORMAL_NODE_STATUS = 1;
- public static final int BUSY_NODE_STATUE = 2;
-
public static final String START_TIME = "start time";
public static final String END_TIME = "end time";
public static final String START_END_DATE = "startDate,endDate";
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
similarity index 58%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
copy to
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
index 52c96defc6..16a0f0e34c 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
@@ -15,28 +15,10 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.model;
+package org.apache.dolphinscheduler.common.enums;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+public enum ServerStatus {
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class MasterHeartBeat implements HeartBeat {
+ NORMAL, ABNORMAL, BUSY
- private long startupTime;
- private long reportTime;
- private double cpuUsage;
- private double memoryUsage;
- private double availablePhysicalMemorySize;
- private double reservedMemory;
- private double diskAvailable;
- private int processId;
-
- private String host;
- private int port;
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
index 52c96defc6..6386f32ae6 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.common.model;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -36,6 +38,7 @@ public class MasterHeartBeat implements HeartBeat {
private double reservedMemory;
private double diskAvailable;
private int processId;
+ private ServerStatus serverStatus;
private String host;
private int port;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
index d3843d2783..396f227f47 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.common.model;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -36,7 +38,7 @@ public class WorkerHeartBeat implements HeartBeat {
private double availablePhysicalMemorySize;
private double reservedMemory;
private double diskAvailable;
- private int serverStatus;
+ private ServerStatus serverStatus;
private int processId;
private String host;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index a6cb9352a8..607b78abbc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
-import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
@@ -135,12 +135,12 @@ public class LowerWeightHostManager extends
CommonHostManager {
log.warn("worker {} in work group {} have not received the
heartbeat", addr, workerGroup);
return Optional.empty();
}
- if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
+ if (ServerStatus.ABNORMAL == heartBeat.getServerStatus()) {
log.warn("worker {} current cpu load average {} is too high or
available memory {}G is too low",
addr, heartBeat.getLoadAverage(),
heartBeat.getAvailablePhysicalMemorySize());
return Optional.empty();
}
- if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
+ if (ServerStatus.BUSY == heartBeat.getServerStatus()) {
log.warn("worker {} is busy, current waiting task count {} is
large than worker thread count {}",
addr, heartBeat.getWorkerWaitingTaskCount(),
heartBeat.getWorkerExecThreadCount());
return Optional.empty();
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java
similarity index 58%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java
index 52c96defc6..ffd7628923 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java
@@ -15,28 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.model;
+package org.apache.dolphinscheduler.server.master.registry;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class MasterHeartBeat implements HeartBeat {
+import java.util.Map;
- private long startupTime;
- private long reportTime;
- private double cpuUsage;
- private double memoryUsage;
- private double availablePhysicalMemorySize;
- private double reservedMemory;
- private double diskAvailable;
- private int processId;
+/**
+ * The listener used in {@link ServerNodeManager} to notify the change of
master info.
+ */
+public interface MasterInfoChangeListener {
- private String host;
- private int port;
+ /**
+ * Used to notify the change of master info.
+ *
+ * @param masterNodeInfo master node info map, key is master address,
value is master info.
+ */
+ void notify(Map<String, MasterHeartBeat> masterNodeInfo);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
new file mode 100644
index 0000000000..5fb0c74f5a
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class MasterSlotManager {
+
+ @Autowired
+ protected ServerNodeManager serverNodeManager;
+
+ @Autowired
+ protected MasterConfig masterConfig;
+
+ private volatile int currentSlot = 0;
+ private volatile int totalSlot = 0;
+
+ @PostConstruct
+ public void init() {
+ serverNodeManager.addMasterInfoChangeListener(new
MasterSlotManager.SlotChangeListener());
+ }
+
+ public int getSlot() {
+ return currentSlot;
+ }
+
+ public int getMasterSize() {
+ return totalSlot;
+ }
+
+ public class SlotChangeListener implements MasterInfoChangeListener {
+
+ private final Lock slotLock = new ReentrantLock();
+
+ private final MasterPriorityQueue masterPriorityQueue = new
MasterPriorityQueue();
+
+ @Override
+ public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
+ List<Server> serverList = masterNodeInfo.values().stream()
+ .filter(heartBeat ->
!heartBeat.getServerStatus().equals(ServerStatus.ABNORMAL))
+
.map(this::convertHeartBeatToServer).collect(Collectors.toList());
+ syncMasterNodes(serverList);
+ }
+
+ /**
+ * sync master nodes
+ */
+ private void syncMasterNodes(List<Server> masterNodes) {
+ slotLock.lock();
+ try {
+ this.masterPriorityQueue.clear();
+ this.masterPriorityQueue.putAll(masterNodes);
+ int tempCurrentSlot =
masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
+ int tempTotalSlot = masterNodes.size();
+ if (tempCurrentSlot < 0) {
+ totalSlot = 0;
+ currentSlot = 0;
+ log.warn("Current master is not in active master list");
+ } else if (tempCurrentSlot != currentSlot || tempTotalSlot !=
totalSlot) {
+ totalSlot = tempTotalSlot;
+ currentSlot = tempCurrentSlot;
+ log.info("Update master nodes, total master size: {},
current slot: {}", totalSlot, currentSlot);
+ }
+ } finally {
+ slotLock.unlock();
+ }
+ }
+
+ private Server convertHeartBeatToServer(MasterHeartBeat
masterHeartBeat) {
+ Server server = new Server();
+ server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
+ server.setLastHeartbeatTime(new
Date(masterHeartBeat.getReportTime()));
+ server.setId(masterHeartBeat.getProcessId());
+ server.setHost(masterHeartBeat.getHost());
+ server.setPort(masterHeartBeat.getPort());
+
+ return server;
+ }
+
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 852f73ce73..feee60a171 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
@@ -32,7 +32,6 @@ import
org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
-import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
@@ -40,10 +39,8 @@ import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -52,7 +49,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -69,8 +65,6 @@ import org.springframework.stereotype.Service;
@Slf4j
public class ServerNodeManager implements InitializingBean {
- private final Lock masterLock = new ReentrantLock();
-
private final ReentrantReadWriteLock workerGroupLock = new
ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock workerGroupReadLock =
workerGroupLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock =
workerGroupLock.writeLock();
@@ -79,12 +73,14 @@ public class ServerNodeManager implements InitializingBean {
private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock =
workerNodeInfoLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock =
workerNodeInfoLock.writeLock();
+ private final ReentrantLock masterNodeInfoLock = new ReentrantLock();
+
/**
* worker group nodes, workerGroup -> ips, combining
registryWorkerGroupNodes and dbWorkerGroupNodes
*/
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes =
new ConcurrentHashMap<>();
- private final Set<String> masterNodes = new HashSet<>();
+ private final Map<String, MasterHeartBeat> masterNodeInfo = new
HashMap<>();
private final Map<String, WorkerHeartBeat> workerNodeInfo = new
HashMap<>();
@@ -99,8 +95,6 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private WorkerGroupMapper workerGroupMapper;
- private final MasterPriorityQueue masterPriorityQueue = new
MasterPriorityQueue();
-
@Autowired
private AlertDao alertDao;
@@ -109,24 +103,13 @@ public class ServerNodeManager implements
InitializingBean {
private final List<WorkerInfoChangeListener> workerInfoChangeListeners =
new ArrayList<>();
- private volatile int currentSlot = 0;
-
- private volatile int totalSlot = 0;
-
- public int getSlot() {
- return currentSlot;
- }
-
- public int getMasterSize() {
- return totalSlot;
- }
+ private final List<MasterInfoChangeListener> masterInfoChangeListeners =
new ArrayList<>();
@Override
public void afterPropertiesSet() {
// load nodes from zookeeper
- updateMasterNodes();
- refreshWorkerNodesAndGroupMappings();
+ refreshNodesAndGroupMappings();
// init executor service
executorService =
@@ -147,7 +130,7 @@ public class ServerNodeManager implements InitializingBean {
public void run() {
try {
// sync worker node info
- refreshWorkerNodesAndGroupMappings();
+ refreshNodesAndGroupMappings();
} catch (Exception e) {
log.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
}
@@ -155,12 +138,15 @@ public class ServerNodeManager implements
InitializingBean {
}
/**
- * Refresh worker nodes and worker group mapping information
+ * Refresh master/worker nodes and worker group mapping information
*/
- private void refreshWorkerNodesAndGroupMappings() {
+ private void refreshNodesAndGroupMappings() {
updateWorkerNodes();
updateWorkerGroupMappings();
notifyWorkerInfoChangeListeners();
+
+ updateMasterNodes();
+ notifyMasterInfoChangeListeners();
}
/**
@@ -214,11 +200,8 @@ public class ServerNodeManager implements InitializingBean
{
try {
if (type.equals(Type.ADD)) {
log.info("master node : {} added.", path);
- updateMasterNodes();
- }
- if (type.equals(Type.REMOVE)) {
+ } else if (type.equals(Type.REMOVE)) {
log.info("master node : {} down.", path);
- updateMasterNodes();
alertDao.sendServerStoppedAlert(1, path, "MASTER");
}
} catch (Exception ex) {
@@ -229,19 +212,17 @@ public class ServerNodeManager implements
InitializingBean {
}
private void updateMasterNodes() {
- currentSlot = 0;
- totalSlot = 0;
- this.masterNodes.clear();
- String nodeLock = RegistryNodeType.MASTER_NODE_LOCK.getRegistryPath();
+ masterNodeInfoLock.lock();
try {
- registryClient.getLock(nodeLock);
- Collection<String> currentNodes =
registryClient.getMasterNodesDirectly();
- List<Server> masterNodeList =
registryClient.getServerList(RegistryNodeType.MASTER);
- syncMasterNodes(currentNodes, masterNodeList);
+ masterNodeInfo.clear();
+ Map<String, String> masterNodeMaps =
registryClient.getServerMaps(RegistryNodeType.MASTER);
+ for (Map.Entry<String, String> entry : masterNodeMaps.entrySet()) {
+ masterNodeInfo.put(entry.getKey(),
JSONUtils.parseObject(entry.getValue(), MasterHeartBeat.class));
+ }
} catch (Exception e) {
log.error("update master nodes error", e);
} finally {
- registryClient.releaseLock(nodeLock);
+ masterNodeInfoLock.unlock();
}
}
@@ -289,30 +270,6 @@ public class ServerNodeManager implements InitializingBean
{
}
}
- /**
- * sync master nodes
- *
- * @param nodes master nodes
- */
- private void syncMasterNodes(Collection<String> nodes, List<Server>
masterNodes) {
- masterLock.lock();
- try {
- this.masterNodes.addAll(nodes);
- this.masterPriorityQueue.clear();
- this.masterPriorityQueue.putList(masterNodes);
- int index =
masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
- if (index >= 0) {
- totalSlot = nodes.size();
- currentSlot = index;
- } else {
- log.warn("Current master is not in active master list");
- }
- log.info("Update master nodes, total master size: {}, current
slot: {}", totalSlot, currentSlot);
- } finally {
- masterLock.unlock();
- }
- }
-
public Map<String, Set<String>> getWorkerGroupNodes() {
workerGroupReadLock.lock();
try {
@@ -360,6 +317,10 @@ public class ServerNodeManager implements InitializingBean
{
}
}
+ public Map<String, MasterHeartBeat> getMasterNodeInfo() {
+ return Collections.unmodifiableMap(masterNodeInfo);
+ }
+
/**
* Add the resource change listener, when the resource changed, the
listener will be notified.
*
@@ -377,6 +338,17 @@ public class ServerNodeManager implements InitializingBean
{
}
}
+ public synchronized void
addMasterInfoChangeListener(MasterInfoChangeListener listener) {
+ masterInfoChangeListeners.add(listener);
+ }
+
+ private void notifyMasterInfoChangeListeners() {
+ Map<String, MasterHeartBeat> masterNodeInfoMap = getMasterNodeInfo();
+ for (MasterInfoChangeListener listener : masterInfoChangeListeners) {
+ listener.notify(masterNodeInfoMap);
+ }
+ }
+
@PreDestroy
public void destroy() {
executorService.shutdownNow();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 0313e2b4a5..4d84644cb0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -33,7 +33,7 @@ import
org.apache.dolphinscheduler.server.master.exception.MasterException;
import
org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.commons.collections4.CollectionUtils;
@@ -73,7 +73,7 @@ public class MasterSchedulerBootstrap extends
BaseDaemonThread implements AutoCl
private WorkflowEventLooper workflowEventLooper;
@Autowired
- private ServerNodeManager serverNodeManager;
+ private MasterSlotManager masterSlotManager;
@Autowired
private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;
@@ -171,8 +171,8 @@ public class MasterSchedulerBootstrap extends
BaseDaemonThread implements AutoCl
private List<Command> findCommands() throws MasterException {
try {
long scheduleStartTime = System.currentTimeMillis();
- int thisMasterSlot = serverNodeManager.getSlot();
- int masterCount = serverNodeManager.getMasterSize();
+ int thisMasterSlot = masterSlotManager.getSlot();
+ int masterCount = masterSlotManager.getMasterSize();
if (masterCount <= 0) {
log.warn("Master count: {} is invalid, the current slot: {}",
masterCount, thisMasterSlot);
return Collections.emptyList();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
index 0578176e5e..89810aea1f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
@@ -25,7 +25,7 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory;
import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
public class WorkflowExecuteContextFactory {
@Autowired
- private ServerNodeManager serverNodeManager;
+ private MasterSlotManager masterSlotManager;
@Autowired
private ProcessService processService;
@@ -85,8 +85,8 @@ public class WorkflowExecuteContextFactory {
}
private SlotCheckState slotCheck(Command command) {
- int slot = serverNodeManager.getSlot();
- int masterSize = serverNodeManager.getMasterSize();
+ int slot = masterSlotManager.getSlot();
+ int masterSize = masterSlotManager.getMasterSize();
SlotCheckState state;
if (masterSize <= 0) {
state = SlotCheckState.CHANGE;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
index 824d4778ab..f8a9b30e28 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.task;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
@@ -60,6 +61,7 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
.memoryUsage(OSUtils.memoryUsagePercentage())
.diskAvailable(OSUtils.diskAvailable())
.processId(processId)
+ .serverStatus(getServerStatus())
.host(NetUtils.getHost())
.port(masterConfig.getListenPort())
.build();
@@ -72,4 +74,10 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
log.debug("Success write master heartBeatInfo into registry,
masterRegistryPath: {}, heartBeatInfo: {}",
heartBeatPath, masterHeartBeatJson);
}
+
+ private ServerStatus getServerStatus() {
+ return OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory())
+ ? ServerStatus.ABNORMAL
+ : ServerStatus.NORMAL;
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
new file mode 100644
index 0000000000..4ee75a0392
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class MasterSlotManagerTest {
+
+ @InjectMocks
+ private MasterSlotManager masterSlotManager = Mockito.spy(new
MasterSlotManager());
+
+ @Mock
+ private MasterConfig masterConfig;
+
+ @Test
+ void testNormalMasterSlots() {
+ // on normal Master side
+
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:7777");
+
+ sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL);
+ Assertions.assertEquals(1, masterSlotManager.getMasterSize());
+ Assertions.assertEquals(0, masterSlotManager.getSlot());
+
+ sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL);
+ Assertions.assertEquals(2, masterSlotManager.getMasterSize());
+ Assertions.assertEquals(1, masterSlotManager.getSlot());
+ }
+
+ @Test
+ void testOverloadMasterSlots() {
+ // on abnormal Master side
+
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:6666");
+
+ sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL);
+ Assertions.assertEquals(0, masterSlotManager.getMasterSize());
+ Assertions.assertEquals(0, masterSlotManager.getSlot());
+
+ sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL);
+ Assertions.assertEquals(2, masterSlotManager.getMasterSize());
+ Assertions.assertEquals(0, masterSlotManager.getSlot());
+ }
+
+ public void sendHeartBeat(ServerStatus serverStatus1, ServerStatus
serverStatus2) {
+ MasterSlotManager.SlotChangeListener slotChangeListener =
masterSlotManager.new SlotChangeListener();
+
+ Map<String, MasterHeartBeat> masterNodeInfo = new HashMap<>();
+ // generate heartbeat
+ MasterHeartBeat masterHeartBeat1 = MasterHeartBeat.builder()
+ .startupTime(System.currentTimeMillis())
+ .serverStatus(serverStatus1)
+ .host("127.0.0.1")
+ .port(6666)
+ .build();
+ MasterHeartBeat masterHeartBeat2 = MasterHeartBeat.builder()
+ .startupTime(System.currentTimeMillis())
+ .serverStatus(serverStatus2)
+ .host("127.0.0.1")
+ .port(7777)
+ .build();
+ masterNodeInfo.put("127.0.0.1:6666", masterHeartBeat1);
+ masterNodeInfo.put("127.0.0.1:7777", masterHeartBeat2);
+
+ slotChangeListener.notify(masterNodeInfo);
+ }
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
index e5405489e3..d4fe74cd9f 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
@@ -20,10 +20,10 @@ package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -62,7 +62,7 @@ public class MasterPriorityQueue implements
TaskPriorityQueue<Server> {
return queue.size();
}
- public void putList(List<Server> serverList) {
+ public void putAll(Collection<Server> serverList) {
for (Server server : serverList) {
this.queue.put(server);
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index bb9c77e849..1294163e1e 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task;
-import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
@@ -59,7 +59,7 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
double reservedMemory = workerConfig.getReservedMemory();
double memoryUsagePercentage = OSUtils.memoryUsagePercentage();
int execThreads = workerConfig.getExecThreads();
- int serverStatus =
+ ServerStatus serverStatus =
getServerStatus(cpuUsagePercentage, maxCpuUsePercentage,
memoryUsagePercentage, reservedMemory,
execThreads, this.workerWaitingTaskCount.get());
@@ -91,23 +91,23 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
workerRegistryPath, workerHeartBeatJson);
}
- public int getServerStatus(double cpuUsagePercentage,
- double maxCpuUsePercentage,
- double memoryUsagePercentage,
- double reservedMemory,
- int workerExecThreadCount,
- int workerWaitingTaskCount) {
+ private ServerStatus getServerStatus(double cpuUsagePercentage,
+ double maxCpuUsePercentage,
+ double memoryUsagePercentage,
+ double reservedMemory,
+ int workerExecThreadCount,
+ int workerWaitingTaskCount) {
if (cpuUsagePercentage > maxCpuUsePercentage || (1 -
memoryUsagePercentage) < reservedMemory) {
log.warn(
"current cpu load average {} is higher than {} or
available memory {} is lower than {}",
cpuUsagePercentage, maxCpuUsePercentage, 1 -
memoryUsagePercentage, reservedMemory);
- return Constants.ABNORMAL_NODE_STATUS;
+ return ServerStatus.ABNORMAL;
} else if (workerWaitingTaskCount > workerExecThreadCount) {
log.warn("current waiting task count {} is large than worker
thread count {}, worker is busy",
workerWaitingTaskCount, workerExecThreadCount);
- return Constants.BUSY_NODE_STATUE;
+ return ServerStatus.BUSY;
} else {
- return Constants.NORMAL_NODE_STATUS;
+ return ServerStatus.NORMAL;
}
}
}