This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 50f1766 [DS-6499][WorkerServer] report busy state when worker is
overload (#6512)
50f1766 is described below
commit 50f1766903f27ce0960e133b30cf394187e8c5c1
Author: wind <[email protected]>
AuthorDate: Wed Oct 13 11:45:56 2021 +0800
[DS-6499][WorkerServer] report busy state when worker is overload (#6512)
* [DS-6499][WorkerServer] report busy state when worker is overload
---
.../apache/dolphinscheduler/common/Constants.java | 4 +-
.../dolphinscheduler/common/utils/HeartBeat.java | 248 +++++++++++++++++++++
.../dolphinscheduler/common/utils/ResInfo.java | 135 -----------
.../common/utils/HeartBeatTest.java | 76 +++++++
.../master/dispatch/host/CommonHostManager.java | 15 +-
.../dispatch/host/LowerWeightHostManager.java | 40 ++--
.../master/registry/MasterRegistryClient.java | 11 +-
.../server/registry/HeartBeatTask.java | 81 +++----
.../server/worker/WorkerServer.java | 12 +-
.../worker/processor/TaskExecuteProcessor.java | 2 +-
.../worker/registry/WorkerRegistryClient.java | 22 +-
.../server/worker/runner/WorkerManagerThread.java | 14 +-
.../service/registry/RegistryClient.java | 15 +-
13 files changed, 432 insertions(+), 243 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 2d69504..e662347 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -521,8 +521,7 @@ public final class Constants {
/**
* heartbeat for zk info length
*/
- public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
- public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH =
11;
+ public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 13;
/**
* jar
@@ -1029,6 +1028,7 @@ public final class Constants {
public static final int NORMAL_NODE_STATUS = 0;
public static final int ABNORMAL_NODE_STATUS = 1;
+ public static final int BUSY_NODE_STATUE = 2;
public static final String START_TIME = "start time";
public static final String END_TIME = "end time";
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
new file mode 100644
index 0000000..bec0f75
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.Constants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HeartBeat {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HeartBeat.class);
+ public static final String COMMA = ",";
+
+ private long startupTime;
+ private long reportTime;
+ private double cpuUsage;
+ private double memoryUsage;
+ private double loadAverage;
+ private double availablePhysicalMemorySize;
+ private double maxCpuloadAvg;
+ private double reservedMemory;
+ private int serverStatus;
+ private int processId;
+
+ private int workerHostWeight; // worker host weight
+ private int workerWaitingTaskCount; // worker waiting task count
+ private int workerExecThreadCount; // worker thread pool thread count
+
+ public long getStartupTime() {
+ return startupTime;
+ }
+
+ public void setStartupTime(long startupTime) {
+ this.startupTime = startupTime;
+ }
+
+ public long getReportTime() {
+ return reportTime;
+ }
+
+ public void setReportTime(long reportTime) {
+ this.reportTime = reportTime;
+ }
+
+ public double getCpuUsage() {
+ return cpuUsage;
+ }
+
+ public void setCpuUsage(double cpuUsage) {
+ this.cpuUsage = cpuUsage;
+ }
+
+ public double getMemoryUsage() {
+ return memoryUsage;
+ }
+
+ public void setMemoryUsage(double memoryUsage) {
+ this.memoryUsage = memoryUsage;
+ }
+
+ public double getLoadAverage() {
+ return loadAverage;
+ }
+
+ public void setLoadAverage(double loadAverage) {
+ this.loadAverage = loadAverage;
+ }
+
+ public double getAvailablePhysicalMemorySize() {
+ return availablePhysicalMemorySize;
+ }
+
+ public void setAvailablePhysicalMemorySize(double
availablePhysicalMemorySize) {
+ this.availablePhysicalMemorySize = availablePhysicalMemorySize;
+ }
+
+ public double getMaxCpuloadAvg() {
+ return maxCpuloadAvg;
+ }
+
+ public void setMaxCpuloadAvg(double maxCpuloadAvg) {
+ this.maxCpuloadAvg = maxCpuloadAvg;
+ }
+
+ public double getReservedMemory() {
+ return reservedMemory;
+ }
+
+ public void setReservedMemory(double reservedMemory) {
+ this.reservedMemory = reservedMemory;
+ }
+
+ public int getServerStatus() {
+ return serverStatus;
+ }
+
+ public void setServerStatus(int serverStatus) {
+ this.serverStatus = serverStatus;
+ }
+
+ public int getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(int processId) {
+ this.processId = processId;
+ }
+
+ public int getWorkerHostWeight() {
+ return workerHostWeight;
+ }
+
+ public void setWorkerHostWeight(int workerHostWeight) {
+ this.workerHostWeight = workerHostWeight;
+ }
+
+ public int getWorkerWaitingTaskCount() {
+ return workerWaitingTaskCount;
+ }
+
+ public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) {
+ this.workerWaitingTaskCount = workerWaitingTaskCount;
+ }
+
+ public int getWorkerExecThreadCount() {
+ return workerExecThreadCount;
+ }
+
+ public void setWorkerExecThreadCount(int workerExecThreadCount) {
+ this.workerExecThreadCount = workerExecThreadCount;
+ }
+
+ public HeartBeat() {
+ this.reportTime = System.currentTimeMillis();
+ this.serverStatus = Constants.NORMAL_NODE_STATUS;
+ }
+
+ public HeartBeat(long startupTime, double maxCpuloadAvg, double
reservedMemory) {
+ this.reportTime = System.currentTimeMillis();
+ this.serverStatus = Constants.NORMAL_NODE_STATUS;
+ this.startupTime = startupTime;
+ this.maxCpuloadAvg = maxCpuloadAvg;
+ this.reservedMemory = reservedMemory;
+ }
+
+ public HeartBeat(long startupTime, double maxCpuloadAvg, double
reservedMemory, int hostWeight, int workerExecThreadCount) {
+ this.reportTime = System.currentTimeMillis();
+ this.serverStatus = Constants.NORMAL_NODE_STATUS;
+ this.startupTime = startupTime;
+ this.maxCpuloadAvg = maxCpuloadAvg;
+ this.reservedMemory = reservedMemory;
+ this.workerHostWeight = hostWeight;
+ this.workerExecThreadCount = workerExecThreadCount;
+ }
+
+ /**
+ * fill system info
+ */
+ private void fillSystemInfo() {
+ this.cpuUsage = OSUtils.cpuUsage();
+ this.loadAverage = OSUtils.loadAverage();
+ this.availablePhysicalMemorySize =
OSUtils.availablePhysicalMemorySize();
+ this.memoryUsage = OSUtils.memoryUsage();
+ this.processId = OSUtils.getProcessID();
+ }
+
+ /**
+ * update server state
+ */
+ public void updateServerState() {
+ if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize <
reservedMemory) {
+ logger.warn("current cpu load average {} is too high or available
memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
+ loadAverage, availablePhysicalMemorySize, maxCpuloadAvg,
reservedMemory);
+ this.serverStatus = Constants.ABNORMAL_NODE_STATUS;
+ } else if (workerWaitingTaskCount > workerExecThreadCount) {
+ logger.warn("current waiting task count {} is large than worker
thread count {}, worker is busy", workerWaitingTaskCount,
workerExecThreadCount);
+ this.serverStatus = Constants.BUSY_NODE_STATUE;
+ } else {
+ this.serverStatus = Constants.NORMAL_NODE_STATUS;
+ }
+ }
+
+ /**
+ * encode heartbeat
+ */
+ public String encodeHeartBeat() {
+ this.fillSystemInfo();
+ this.updateServerState();
+
+ StringBuilder builder = new StringBuilder(100);
+ builder.append(cpuUsage).append(COMMA);
+ builder.append(memoryUsage).append(COMMA);
+ builder.append(loadAverage).append(COMMA);
+ builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
+ builder.append(maxCpuloadAvg).append(Constants.COMMA);
+ builder.append(reservedMemory).append(Constants.COMMA);
+ builder.append(startupTime).append(Constants.COMMA);
+ builder.append(reportTime).append(Constants.COMMA);
+ builder.append(serverStatus).append(COMMA);
+ builder.append(processId).append(COMMA);
+ builder.append(workerHostWeight).append(COMMA);
+ builder.append(workerExecThreadCount).append(COMMA);
+ builder.append(workerWaitingTaskCount);
+
+ return builder.toString();
+ }
+
+ /**
+ * decode heartbeat
+ */
+ public static HeartBeat decodeHeartBeat(String heartBeatInfo) {
+ String[] parts = heartBeatInfo.split(Constants.COMMA);
+ if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
+ return null;
+ }
+ HeartBeat heartBeat = new HeartBeat();
+ heartBeat.cpuUsage = Double.parseDouble(parts[0]);
+ heartBeat.memoryUsage = Double.parseDouble(parts[1]);
+ heartBeat.loadAverage = Double.parseDouble(parts[2]);
+ heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]);
+ heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]);
+ heartBeat.reservedMemory = Double.parseDouble(parts[5]);
+ heartBeat.startupTime = Long.parseLong(parts[6]);
+ heartBeat.reportTime = Long.parseLong(parts[7]);
+ heartBeat.serverStatus = Integer.parseInt(parts[8]);
+ heartBeat.processId = Integer.parseInt(parts[9]);
+ heartBeat.workerHostWeight = Integer.parseInt(parts[10]);
+ heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]);
+ heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]);
+ return heartBeat;
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
deleted file mode 100644
index f54bd17..0000000
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.model.Server;
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * heartbeat for ZK reigster res info
- */
-public class ResInfo {
-
- /**
- * cpuUsage
- */
- private double cpuUsage;
-
- /**
- * memoryUsage
- */
- private double memoryUsage;
-
- /**
- * loadAverage
- */
- private double loadAverage;
-
- public ResInfo(double cpuUsage, double memoryUsage) {
- this.cpuUsage = cpuUsage;
- this.memoryUsage = memoryUsage;
- }
-
- public ResInfo(double cpuUsage, double memoryUsage, double loadAverage) {
- this(cpuUsage,memoryUsage);
- this.loadAverage = loadAverage;
- }
-
- public double getCpuUsage() {
- return cpuUsage;
- }
-
- public void setCpuUsage(double cpuUsage) {
- this.cpuUsage = cpuUsage;
- }
-
- public double getMemoryUsage() {
- return memoryUsage;
- }
-
- public void setMemoryUsage(double memoryUsage) {
- this.memoryUsage = memoryUsage;
- }
-
- public double getLoadAverage() {
- return loadAverage;
- }
-
- public void setLoadAverage(double loadAverage) {
- this.loadAverage = loadAverage;
- }
-
- /**
- * get CPU and memory usage
- * @param cpuUsage cpu usage
- * @param memoryUsage memory usage
- * @param loadAverage load average
- * @return cpu and memory usage
- */
- public static String getResInfoJson(double cpuUsage, double memoryUsage,
double loadAverage) {
- ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage);
- return JSONUtils.toJsonString(resInfo);
- }
-
- /**
- * parse heartbeat info for zk
- * @param heartBeatInfo heartbeat info
- * @return heartbeat info to Server
- */
- public static Server parseHeartbeatForRegistryInfo(String heartBeatInfo) {
- if (!isValidHeartbeatForRegistryInfo(heartBeatInfo)) {
- return null;
- }
- String[] parts = heartBeatInfo.split(Constants.COMMA);
- Server server = new Server();
- server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]),
- Double.parseDouble(parts[1]),
- Double.parseDouble(parts[2])));
- server.setCreateTime(DateUtils.stringToDate(parts[6]));
- server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7]));
- //set process id
- server.setId(Integer.parseInt(parts[9]));
- return server;
- }
-
- /**
- * is valid heartbeat info for zk
- * @param heartBeatInfo heartbeat info
- * @return heartbeat info is valid
- */
- public static boolean isValidHeartbeatForRegistryInfo(String
heartBeatInfo) {
- if (!StringUtils.isEmpty(heartBeatInfo)) {
- String[] parts = heartBeatInfo.split(Constants.COMMA);
- return parts.length ==
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH
- || parts.length ==
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
- }
- return false;
- }
-
- /**
- * is new heartbeat info for zk with weight
- * @param parts heartbeat info parts
- * @return heartbeat info is new with weight
- */
- public static boolean isNewHeartbeatWithWeight(String[] parts) {
- return parts.length ==
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
- }
-
-}
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
new file mode 100644
index 0000000..c71450f
--- /dev/null
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.dolphinscheduler.common.Constants;
+
+import org.junit.Test;
+
+/**
+ * NetUtilsTest
+ */
+public class HeartBeatTest {
+
+ @Test
+ public void testAbnormalState() {
+ long startupTime = System.currentTimeMillis();
+ double loadAverage = 100;
+ double reservedMemory = 100;
+ HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage,
reservedMemory);
+ heartBeat.updateServerState();
+ assertEquals(Constants.ABNORMAL_NODE_STATUS,
heartBeat.getServerStatus());
+ }
+
+ @Test
+ public void testBusyState() {
+ long startupTime = System.currentTimeMillis();
+ double loadAverage = 0;
+ double reservedMemory = 0;
+ int hostWeight = 1;
+ int taskCount = 200;
+ int workerThreadCount = 199;
+ HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage,
reservedMemory, hostWeight, workerThreadCount);
+
+ heartBeat.setWorkerWaitingTaskCount(taskCount);
+ heartBeat.updateServerState();
+ assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus());
+ }
+
+ @Test
+ public void testDecodeHeartBeat() throws Exception {
+ String heartBeatInfo =
"0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200";
+ HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+
+ double delta = 0.001;
+ assertEquals(0.35, heartBeat.getCpuUsage(), delta);
+ assertEquals(0.58, heartBeat.getMemoryUsage(), delta);
+ assertEquals(3.09, heartBeat.getLoadAverage(), delta);
+ assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta);
+ assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta);
+ assertEquals(1.0, heartBeat.getReservedMemory(), delta);
+ assertEquals(1634033006749L, heartBeat.getStartupTime());
+ assertEquals(1634033006857L, heartBeat.getReportTime());
+ assertEquals(1, heartBeat.getServerStatus());
+ assertEquals(29732, heartBeat.getProcessId());
+ assertEquals(199, heartBeat.getWorkerExecThreadCount());
+ assertEquals(200, heartBeat.getWorkerWaitingTaskCount());
+ }
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 0e84db6..d1448b3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -19,7 +19,7 @@ package
org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -36,7 +36,7 @@ import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
/**
- * common host manager
+ * common host manager
*/
public abstract class CommonHostManager implements HostManager {
@@ -48,6 +48,7 @@ public abstract class CommonHostManager implements
HostManager {
/**
* select host
+ *
* @param context context
* @return host
*/
@@ -87,12 +88,12 @@ public abstract class CommonHostManager implements
HostManager {
return hostWorkers;
}
- protected int getWorkerHostWeightFromHeartbeat(String heartbeat) {
+ protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) {
int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
- if (!StringUtils.isEmpty(heartbeat)) {
- String[] parts = heartbeat.split(Constants.COMMA);
- if (ResInfo.isNewHeartbeatWithWeight(parts)) {
- hostWeight = Integer.parseInt(parts[10]);
+ if (!StringUtils.isEmpty(heartBeatInfo)) {
+ HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+ if (heartBeat != null) {
+ hostWeight = heartBeat.getWorkerHostWeight();
}
}
return hostWeight;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 86ed6a8..f78b957 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -19,8 +19,7 @@ package
org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -47,7 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * lower weight host manager
+ * lower weight host manager
*/
public class LowerWeightHostManager extends CommonHostManager {
@@ -79,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager
{
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("LowerWeightHostManagerExecutor"));
- this.executorService.scheduleWithFixedDelay(new
RefreshResourceTask(),0, 5, TimeUnit.SECONDS);
+ this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),
0, 5, TimeUnit.SECONDS);
}
@PreDestroy
@@ -89,6 +88,7 @@ public class LowerWeightHostManager extends CommonHostManager
{
/**
* select host
+ *
* @param context context
* @return host
*/
@@ -153,23 +153,23 @@ public class LowerWeightHostManager extends
CommonHostManager {
}
}
- public HostWeight getHostWeight(String addr, String workerGroup,
String heartbeat) {
- if (ResInfo.isValidHeartbeatForRegistryInfo(heartbeat)) {
- String[] parts = heartbeat.split(Constants.COMMA);
- int status = Integer.parseInt(parts[8]);
- if (status == Constants.ABNORMAL_NODE_STATUS) {
- logger.warn("worker {} current cpu load average {} is too
high or available memory {}G is too low",
- addr, Double.parseDouble(parts[2]),
Double.parseDouble(parts[3]));
- return null;
- }
- double cpu = Double.parseDouble(parts[0]);
- double memory = Double.parseDouble(parts[1]);
- double loadAverage = Double.parseDouble(parts[2]);
- long startTime = DateUtils.stringToDate(parts[6]).getTime();
- int weight = getWorkerHostWeightFromHeartbeat(heartbeat);
- return new HostWeight(HostWorker.of(addr, weight,
workerGroup), cpu, memory, loadAverage, startTime);
+ public HostWeight getHostWeight(String addr, String workerGroup,
String heartBeatInfo) {
+ HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+ if (heartBeat == null) {
+ return null;
+ }
+ if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus())
{
+ logger.warn("worker {} current cpu load average {} is too high
or available memory {}G is too low",
+ addr, heartBeat.getLoadAverage(),
heartBeat.getAvailablePhysicalMemorySize());
+ return null;
+ }
+ if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
+ logger.warn("worker {} is busy, current waiting task count {}
is large than worker thread count {}",
+ addr, heartBeat.getWorkerWaitingTaskCount(),
heartBeat.getWorkerExecThreadCount());
+ return null;
}
- return null;
+ return new HostWeight(HostWorker.of(addr,
heartBeat.getWorkerHostWeight(), workerGroup),
+ heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(),
heartBeat.getLoadAverage(), heartBeat.getStartupTime());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 22de8e7..7bae6de 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -96,14 +95,14 @@ public class MasterRegistryClient {
private ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceExecMaps;
/**
- * master start time
+ * master startup time, ms
*/
- private String startTime;
+ private long startupTime;
private String localNodePath;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceExecMaps) {
- this.startTime = DateUtils.dateToString(new Date());
+ this.startupTime = System.currentTimeMillis();
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
this.processInstanceExecMaps = processInstanceExecMaps;
@@ -364,14 +363,14 @@ public class MasterRegistryClient {
String address = NetUtils.getAddr(masterConfig.getListenPort());
localNodePath = getMasterPath();
int masterHeartbeatInterval =
masterConfig.getMasterHeartbeatInterval();
- HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+ HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory(),
Sets.newHashSet(getMasterPath()),
Constants.MASTER_TYPE,
registryClient);
- registryClient.persistEphemeral(localNodePath,
heartBeatTask.heartBeatInfo());
+ registryClient.persistEphemeral(localNodePath,
heartBeatTask.getHeartBeatInfo());
registryClient.addConnectionStateListener(new
MasterRegistryConnectStateListener());
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, masterHeartbeatInterval);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index c807877..61e8c40 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -17,15 +17,12 @@
package org.apache.dolphinscheduler.server.registry;
-import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import java.util.Date;
import java.util.Set;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,42 +33,43 @@ public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
- private String startTime;
- private double maxCpuloadAvg;
- private double reservedMemory;
- private int hostWeight; // worker host weight
private Set<String> heartBeatPaths;
- private String serverType;
private RegistryClient registryClient;
+ private WorkerManagerThread workerManagerThread;
+ private String serverType;
+ private HeartBeat heartBeat;
- public HeartBeatTask(String startTime,
+ public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient) {
- this.startTime = startTime;
- this.maxCpuloadAvg = maxCpuloadAvg;
- this.reservedMemory = reservedMemory;
this.heartBeatPaths = heartBeatPaths;
- this.serverType = serverType;
this.registryClient = registryClient;
+ this.serverType = serverType;
+ this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory);
}
- public HeartBeatTask(String startTime,
+ public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
int hostWeight,
Set<String> heartBeatPaths,
String serverType,
- RegistryClient registryClient) {
- this.startTime = startTime;
- this.maxCpuloadAvg = maxCpuloadAvg;
- this.reservedMemory = reservedMemory;
- this.hostWeight = hostWeight;
+ RegistryClient registryClient,
+ int workerThreadCount,
+ WorkerManagerThread workerManagerThread
+ ) {
this.heartBeatPaths = heartBeatPaths;
- this.serverType = serverType;
this.registryClient = registryClient;
+ this.workerManagerThread = workerManagerThread;
+ this.serverType = serverType;
+ this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory, hostWeight, workerThreadCount);
+ }
+
+ public String getHeartBeatInfo() {
+ return this.heartBeat.encodeHeartBeat();
}
@Override
@@ -85,41 +83,16 @@ public class HeartBeatTask implements Runnable {
}
}
+ if (workerManagerThread != null) {
+ // update waiting task count
+
heartBeat.setWorkerWaitingTaskCount(workerManagerThread.getThreadPoolQueueSize());
+ }
+
for (String heartBeatPath : heartBeatPaths) {
- registryClient.update(heartBeatPath, heartBeatInfo());
+ registryClient.update(heartBeatPath,
heartBeat.encodeHeartBeat());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
}
}
-
- public String heartBeatInfo() {
- double loadAverage = OSUtils.loadAverage();
- double availablePhysicalMemorySize =
OSUtils.availablePhysicalMemorySize();
- int status = Constants.NORMAL_NODE_STATUS;
- if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize <
reservedMemory) {
- logger.warn("current cpu load average {} is too high or available
memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
- loadAverage, availablePhysicalMemorySize, maxCpuloadAvg,
reservedMemory);
- status = Constants.ABNORMAL_NODE_STATUS;
- }
-
- StringBuilder builder = new StringBuilder(100);
- builder.append(OSUtils.cpuUsage()).append(COMMA);
- builder.append(OSUtils.memoryUsage()).append(COMMA);
- builder.append(OSUtils.loadAverage()).append(COMMA);
-
builder.append(OSUtils.availablePhysicalMemorySize()).append(Constants.COMMA);
- builder.append(maxCpuloadAvg).append(Constants.COMMA);
- builder.append(reservedMemory).append(Constants.COMMA);
- builder.append(startTime).append(Constants.COMMA);
- builder.append(DateUtils.dateToString(new
Date())).append(Constants.COMMA);
- builder.append(status).append(COMMA);
- // save process id
- builder.append(OSUtils.getProcessID());
- // worker host weight
- if (Constants.WORKER_TYPE.equals(serverType)) {
- builder.append(Constants.COMMA).append(hostWeight);
- }
- return builder.toString();
- }
-
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 9705b44..7c03f22 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -81,12 +81,6 @@ public class WorkerServer implements IStoppable {
private NettyRemotingServer nettyRemotingServer;
/**
- * worker registry
- */
- @Autowired
- private WorkerRegistryClient workerRegistryClient;
-
- /**
* worker config
*/
@Autowired
@@ -110,6 +104,12 @@ public class WorkerServer implements IStoppable {
@Autowired
private WorkerManagerThread workerManagerThread;
+ /**
+ * worker registry
+ */
+ @Autowired
+ private WorkerRegistryClient workerRegistryClient;
+
private TaskPluginManager taskPluginManager;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index c3720fe..76d70a4 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -172,7 +172,7 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
// submit task to manager
if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext,
taskCallbackService, alertClientService, taskPluginManager))) {
- logger.info("submit task to manager error, queue is full, queue
size is {}", workerManager.getQueueSize());
+ logger.info("submit task to manager error, queue is full, queue
size is {}", workerManager.getDelayQueueSize());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 363b497..e8c6ad0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -24,16 +24,15 @@ import static
org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
-import java.util.Date;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
@@ -64,6 +63,12 @@ public class WorkerRegistryClient {
private WorkerConfig workerConfig;
/**
+ * worker manager
+ */
+ @Autowired
+ private WorkerManagerThread workerManagerThread;
+
+ /**
* heartbeat executor
*/
private ScheduledExecutorService heartBeatExecutor;
@@ -71,16 +76,16 @@ public class WorkerRegistryClient {
private RegistryClient registryClient;
/**
- * worker start time
+ * worker startup time, ms
*/
- private String startTime;
+ private long startupTime;
private Set<String> workerGroups;
@PostConstruct
public void initWorkRegistry() {
this.workerGroups = workerConfig.getWorkerGroups();
- this.startTime = DateUtils.dateToString(new Date());
+ this.startupTime = System.currentTimeMillis();
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
}
@@ -98,13 +103,16 @@ public class WorkerRegistryClient {
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
}
- HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+ HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getWorkerMaxCpuloadAvg(),
workerConfig.getWorkerReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_TYPE,
- registryClient);
+ registryClient,
+ workerConfig.getWorkerExecThreads(),
+ workerManagerThread
+ );
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address,
workerHeartbeatInterval);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 8319e01..4f68166 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,15 +74,24 @@ public class WorkerManagerThread implements Runnable {
}
/**
- * get queue size
+ * get delay queue size
*
* @return queue size
*/
- public int getQueueSize() {
+ public int getDelayQueueSize() {
return workerExecuteQueue.size();
}
/**
+ * get thread pool queue size
+ *
+ * @return queue size
+ */
+ public int getThreadPoolQueueSize() {
+ return ((ThreadPoolExecutor) workerExecService).getQueue().size();
+ }
+
+ /**
* Kill tasks that have not been executed, like delay task
* then send Response to Master, update the execution status of task
instance
*/
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index c6b1eb8..210ce21 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -31,12 +31,14 @@ import static
org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -93,10 +95,17 @@ public class RegistryClient extends RegistryCenter {
List<Server> serverList = new ArrayList<>();
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
- Server server =
ResInfo.parseHeartbeatForRegistryInfo(entry.getValue());
- if (server == null) {
+ HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue());
+ if (heartBeat == null) {
continue;
}
+
+ Server server = new Server();
+ server.setResInfo(JSONUtils.toJsonString(heartBeat));
+ server.setCreateTime(new Date(heartBeat.getStartupTime()));
+ server.setLastHeartbeatTime(new Date(heartBeat.getReportTime()));
+ server.setId(heartBeat.getProcessId());
+
String key = entry.getKey();
server.setZkDirectory(parentPath + "/" + key);
// set host and port