This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 50f1766  [DS-6499][WorkerServer] report busy state when worker is 
overload (#6512)
50f1766 is described below

commit 50f1766903f27ce0960e133b30cf394187e8c5c1
Author: wind <[email protected]>
AuthorDate: Wed Oct 13 11:45:56 2021 +0800

    [DS-6499][WorkerServer] report busy state when worker is overload (#6512)
    
    * [DS-6499][WorkerServer] report busy state when worker is overload
---
 .../apache/dolphinscheduler/common/Constants.java  |   4 +-
 .../dolphinscheduler/common/utils/HeartBeat.java   | 248 +++++++++++++++++++++
 .../dolphinscheduler/common/utils/ResInfo.java     | 135 -----------
 .../common/utils/HeartBeatTest.java                |  76 +++++++
 .../master/dispatch/host/CommonHostManager.java    |  15 +-
 .../dispatch/host/LowerWeightHostManager.java      |  40 ++--
 .../master/registry/MasterRegistryClient.java      |  11 +-
 .../server/registry/HeartBeatTask.java             |  81 +++----
 .../server/worker/WorkerServer.java                |  12 +-
 .../worker/processor/TaskExecuteProcessor.java     |   2 +-
 .../worker/registry/WorkerRegistryClient.java      |  22 +-
 .../server/worker/runner/WorkerManagerThread.java  |  14 +-
 .../service/registry/RegistryClient.java           |  15 +-
 13 files changed, 432 insertions(+), 243 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 2d69504..e662347 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -521,8 +521,7 @@ public final class Constants {
     /**
      * heartbeat for zk info length
      */
-    public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
-    public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH = 
11;
+    public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 13;
 
     /**
      * jar
@@ -1029,6 +1028,7 @@ public final class Constants {
 
     public static final int NORMAL_NODE_STATUS = 0;
     public static final int ABNORMAL_NODE_STATUS = 1;
+    public static final int BUSY_NODE_STATUE = 2;
 
     public static final String START_TIME = "start time";
     public static final String END_TIME = "end time";
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
new file mode 100644
index 0000000..bec0f75
--- /dev/null
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.Constants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HeartBeat {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HeartBeat.class);
+    public static final String COMMA = ",";
+
+    private long startupTime;
+    private long reportTime;
+    private double cpuUsage;
+    private double memoryUsage;
+    private double loadAverage;
+    private double availablePhysicalMemorySize;
+    private double maxCpuloadAvg;
+    private double reservedMemory;
+    private int serverStatus;
+    private int processId;
+
+    private int workerHostWeight; // worker host weight
+    private int workerWaitingTaskCount; // worker waiting task count
+    private int workerExecThreadCount; // worker thread pool thread count
+
+    public long getStartupTime() {
+        return startupTime;
+    }
+
+    public void setStartupTime(long startupTime) {
+        this.startupTime = startupTime;
+    }
+
+    public long getReportTime() {
+        return reportTime;
+    }
+
+    public void setReportTime(long reportTime) {
+        this.reportTime = reportTime;
+    }
+
+    public double getCpuUsage() {
+        return cpuUsage;
+    }
+
+    public void setCpuUsage(double cpuUsage) {
+        this.cpuUsage = cpuUsage;
+    }
+
+    public double getMemoryUsage() {
+        return memoryUsage;
+    }
+
+    public void setMemoryUsage(double memoryUsage) {
+        this.memoryUsage = memoryUsage;
+    }
+
+    public double getLoadAverage() {
+        return loadAverage;
+    }
+
+    public void setLoadAverage(double loadAverage) {
+        this.loadAverage = loadAverage;
+    }
+
+    public double getAvailablePhysicalMemorySize() {
+        return availablePhysicalMemorySize;
+    }
+
+    public void setAvailablePhysicalMemorySize(double 
availablePhysicalMemorySize) {
+        this.availablePhysicalMemorySize = availablePhysicalMemorySize;
+    }
+
+    public double getMaxCpuloadAvg() {
+        return maxCpuloadAvg;
+    }
+
+    public void setMaxCpuloadAvg(double maxCpuloadAvg) {
+        this.maxCpuloadAvg = maxCpuloadAvg;
+    }
+
+    public double getReservedMemory() {
+        return reservedMemory;
+    }
+
+    public void setReservedMemory(double reservedMemory) {
+        this.reservedMemory = reservedMemory;
+    }
+
+    public int getServerStatus() {
+        return serverStatus;
+    }
+
+    public void setServerStatus(int serverStatus) {
+        this.serverStatus = serverStatus;
+    }
+
+    public int getProcessId() {
+        return processId;
+    }
+
+    public void setProcessId(int processId) {
+        this.processId = processId;
+    }
+
+    public int getWorkerHostWeight() {
+        return workerHostWeight;
+    }
+
+    public void setWorkerHostWeight(int workerHostWeight) {
+        this.workerHostWeight = workerHostWeight;
+    }
+
+    public int getWorkerWaitingTaskCount() {
+        return workerWaitingTaskCount;
+    }
+
+    public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) {
+        this.workerWaitingTaskCount = workerWaitingTaskCount;
+    }
+
+    public int getWorkerExecThreadCount() {
+        return workerExecThreadCount;
+    }
+
+    public void setWorkerExecThreadCount(int workerExecThreadCount) {
+        this.workerExecThreadCount = workerExecThreadCount;
+    }
+
+    public HeartBeat() {
+        this.reportTime = System.currentTimeMillis();
+        this.serverStatus = Constants.NORMAL_NODE_STATUS;
+    }
+
+    public HeartBeat(long startupTime, double maxCpuloadAvg, double 
reservedMemory) {
+        this.reportTime = System.currentTimeMillis();
+        this.serverStatus = Constants.NORMAL_NODE_STATUS;
+        this.startupTime = startupTime;
+        this.maxCpuloadAvg = maxCpuloadAvg;
+        this.reservedMemory = reservedMemory;
+    }
+
+    public HeartBeat(long startupTime, double maxCpuloadAvg, double 
reservedMemory, int hostWeight, int workerExecThreadCount) {
+        this.reportTime = System.currentTimeMillis();
+        this.serverStatus = Constants.NORMAL_NODE_STATUS;
+        this.startupTime = startupTime;
+        this.maxCpuloadAvg = maxCpuloadAvg;
+        this.reservedMemory = reservedMemory;
+        this.workerHostWeight = hostWeight;
+        this.workerExecThreadCount = workerExecThreadCount;
+    }
+
+    /**
+     * fill system info
+     */
+    private void fillSystemInfo() {
+        this.cpuUsage = OSUtils.cpuUsage();
+        this.loadAverage = OSUtils.loadAverage();
+        this.availablePhysicalMemorySize = 
OSUtils.availablePhysicalMemorySize();
+        this.memoryUsage = OSUtils.memoryUsage();
+        this.processId = OSUtils.getProcessID();
+    }
+
+    /**
+     * update server state
+     */
+    public void updateServerState() {
+        if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < 
reservedMemory) {
+            logger.warn("current cpu load average {} is too high or available 
memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
+                    loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, 
reservedMemory);
+            this.serverStatus = Constants.ABNORMAL_NODE_STATUS;
+        } else if (workerWaitingTaskCount > workerExecThreadCount) {
+            logger.warn("current waiting task count {} is large than worker 
thread count {}, worker is busy", workerWaitingTaskCount, 
workerExecThreadCount);
+            this.serverStatus = Constants.BUSY_NODE_STATUE;
+        } else {
+            this.serverStatus = Constants.NORMAL_NODE_STATUS;
+        }
+    }
+
+    /**
+     * encode heartbeat
+     */
+    public String encodeHeartBeat() {
+        this.fillSystemInfo();
+        this.updateServerState();
+
+        StringBuilder builder = new StringBuilder(100);
+        builder.append(cpuUsage).append(COMMA);
+        builder.append(memoryUsage).append(COMMA);
+        builder.append(loadAverage).append(COMMA);
+        builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
+        builder.append(maxCpuloadAvg).append(Constants.COMMA);
+        builder.append(reservedMemory).append(Constants.COMMA);
+        builder.append(startupTime).append(Constants.COMMA);
+        builder.append(reportTime).append(Constants.COMMA);
+        builder.append(serverStatus).append(COMMA);
+        builder.append(processId).append(COMMA);
+        builder.append(workerHostWeight).append(COMMA);
+        builder.append(workerExecThreadCount).append(COMMA);
+        builder.append(workerWaitingTaskCount);
+
+        return builder.toString();
+    }
+
+    /**
+     * decode heartbeat
+     */
+    public static HeartBeat decodeHeartBeat(String heartBeatInfo) {
+        String[] parts = heartBeatInfo.split(Constants.COMMA);
+        if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
+            return null;
+        }
+        HeartBeat heartBeat = new HeartBeat();
+        heartBeat.cpuUsage = Double.parseDouble(parts[0]);
+        heartBeat.memoryUsage = Double.parseDouble(parts[1]);
+        heartBeat.loadAverage = Double.parseDouble(parts[2]);
+        heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]);
+        heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]);
+        heartBeat.reservedMemory = Double.parseDouble(parts[5]);
+        heartBeat.startupTime = Long.parseLong(parts[6]);
+        heartBeat.reportTime = Long.parseLong(parts[7]);
+        heartBeat.serverStatus = Integer.parseInt(parts[8]);
+        heartBeat.processId = Integer.parseInt(parts[9]);
+        heartBeat.workerHostWeight = Integer.parseInt(parts[10]);
+        heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]);
+        heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]);
+        return heartBeat;
+    }
+}
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
deleted file mode 100644
index f54bd17..0000000
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.model.Server;
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- *  heartbeat for ZK reigster res info
- */
-public class ResInfo {
-
-    /**
-     *  cpuUsage
-     */
-    private double cpuUsage;
-
-    /**
-     *  memoryUsage
-     */
-    private double memoryUsage;
-
-    /**
-     * loadAverage
-     */
-    private double loadAverage;
-
-    public ResInfo(double cpuUsage, double memoryUsage) {
-        this.cpuUsage = cpuUsage;
-        this.memoryUsage = memoryUsage;
-    }
-
-    public ResInfo(double cpuUsage, double memoryUsage, double loadAverage) {
-        this(cpuUsage,memoryUsage);
-        this.loadAverage = loadAverage;
-    }
-
-    public double getCpuUsage() {
-        return cpuUsage;
-    }
-
-    public void setCpuUsage(double cpuUsage) {
-        this.cpuUsage = cpuUsage;
-    }
-
-    public double getMemoryUsage() {
-        return memoryUsage;
-    }
-
-    public void setMemoryUsage(double memoryUsage) {
-        this.memoryUsage = memoryUsage;
-    }
-
-    public double getLoadAverage() {
-        return loadAverage;
-    }
-
-    public void setLoadAverage(double loadAverage) {
-        this.loadAverage = loadAverage;
-    }
-
-    /**
-     * get CPU and memory usage
-     * @param cpuUsage cpu usage
-     * @param memoryUsage memory usage
-     * @param loadAverage load average
-     * @return cpu and memory usage
-     */
-    public static String getResInfoJson(double cpuUsage, double memoryUsage, 
double loadAverage) {
-        ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage);
-        return JSONUtils.toJsonString(resInfo);
-    }
-
-    /**
-     * parse heartbeat info for zk
-     * @param heartBeatInfo heartbeat info
-     * @return heartbeat info to Server
-     */
-    public static Server parseHeartbeatForRegistryInfo(String heartBeatInfo) {
-        if (!isValidHeartbeatForRegistryInfo(heartBeatInfo)) {
-            return null;
-        }
-        String[] parts = heartBeatInfo.split(Constants.COMMA);
-        Server server = new Server();
-        server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]),
-                Double.parseDouble(parts[1]),
-                Double.parseDouble(parts[2])));
-        server.setCreateTime(DateUtils.stringToDate(parts[6]));
-        server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7]));
-        //set process id
-        server.setId(Integer.parseInt(parts[9]));
-        return server;
-    }
-
-    /**
-     * is valid heartbeat info for zk
-     * @param heartBeatInfo heartbeat info
-     * @return heartbeat info is valid
-     */
-    public static boolean isValidHeartbeatForRegistryInfo(String 
heartBeatInfo) {
-        if (!StringUtils.isEmpty(heartBeatInfo)) {
-            String[] parts = heartBeatInfo.split(Constants.COMMA);
-            return parts.length == 
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH
-                    || parts.length == 
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
-        }
-        return false;
-    }
-
-    /**
-     * is new heartbeat info for zk with weight
-     * @param parts heartbeat info parts
-     * @return heartbeat info is new with weight
-     */
-    public static boolean isNewHeartbeatWithWeight(String[] parts) {
-        return parts.length == 
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
-    }
-
-}
diff --git 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
new file mode 100644
index 0000000..c71450f
--- /dev/null
+++ 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.dolphinscheduler.common.Constants;
+
+import org.junit.Test;
+
+/**
+ * NetUtilsTest
+ */
+public class HeartBeatTest {
+
+    @Test
+    public void testAbnormalState() {
+        long startupTime = System.currentTimeMillis();
+        double loadAverage = 100;
+        double reservedMemory = 100;
+        HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, 
reservedMemory);
+        heartBeat.updateServerState();
+        assertEquals(Constants.ABNORMAL_NODE_STATUS, 
heartBeat.getServerStatus());
+    }
+
+    @Test
+    public void testBusyState() {
+        long startupTime = System.currentTimeMillis();
+        double loadAverage = 0;
+        double reservedMemory = 0;
+        int hostWeight = 1;
+        int taskCount = 200;
+        int workerThreadCount = 199;
+        HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, 
reservedMemory, hostWeight, workerThreadCount);
+
+        heartBeat.setWorkerWaitingTaskCount(taskCount);
+        heartBeat.updateServerState();
+        assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus());
+    }
+
+    @Test
+    public void testDecodeHeartBeat() throws Exception {
+        String heartBeatInfo = 
"0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200";
+        HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+
+        double delta = 0.001;
+        assertEquals(0.35, heartBeat.getCpuUsage(), delta);
+        assertEquals(0.58, heartBeat.getMemoryUsage(), delta);
+        assertEquals(3.09, heartBeat.getLoadAverage(), delta);
+        assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta);
+        assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta);
+        assertEquals(1.0, heartBeat.getReservedMemory(), delta);
+        assertEquals(1634033006749L, heartBeat.getStartupTime());
+        assertEquals(1634033006857L, heartBeat.getReportTime());
+        assertEquals(1, heartBeat.getServerStatus());
+        assertEquals(29732, heartBeat.getProcessId());
+        assertEquals(199, heartBeat.getWorkerExecThreadCount());
+        assertEquals(200, heartBeat.getWorkerWaitingTaskCount());
+    }
+
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 0e84db6..d1448b3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -19,7 +19,7 @@ package 
org.apache.dolphinscheduler.server.master.dispatch.host;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -36,7 +36,7 @@ import java.util.Set;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
- *  common host manager
+ * common host manager
  */
 public abstract class CommonHostManager implements HostManager {
 
@@ -48,6 +48,7 @@ public abstract class CommonHostManager implements 
HostManager {
 
     /**
      * select host
+     *
      * @param context context
      * @return host
      */
@@ -87,12 +88,12 @@ public abstract class CommonHostManager implements 
HostManager {
         return hostWorkers;
     }
 
-    protected int getWorkerHostWeightFromHeartbeat(String heartbeat) {
+    protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) {
         int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
-        if (!StringUtils.isEmpty(heartbeat)) {
-            String[] parts = heartbeat.split(Constants.COMMA);
-            if (ResInfo.isNewHeartbeatWithWeight(parts)) {
-                hostWeight = Integer.parseInt(parts[10]);
+        if (!StringUtils.isEmpty(heartBeatInfo)) {
+            HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+            if (heartBeat != null) {
+                hostWeight = heartBeat.getWorkerHostWeight();
             }
         }
         return hostWeight;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 86ed6a8..f78b957 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -19,8 +19,7 @@ package 
org.apache.dolphinscheduler.server.master.dispatch.host;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -47,7 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *  lower weight host manager
+ * lower weight host manager
  */
 public class LowerWeightHostManager extends CommonHostManager {
 
@@ -79,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager 
{
         this.workerHostWeightsMap = new ConcurrentHashMap<>();
         this.lock = new ReentrantLock();
         this.executorService = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("LowerWeightHostManagerExecutor"));
-        this.executorService.scheduleWithFixedDelay(new 
RefreshResourceTask(),0, 5, TimeUnit.SECONDS);
+        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 
0, 5, TimeUnit.SECONDS);
     }
 
     @PreDestroy
@@ -89,6 +88,7 @@ public class LowerWeightHostManager extends CommonHostManager 
{
 
     /**
      * select host
+     *
      * @param context context
      * @return host
      */
@@ -153,23 +153,23 @@ public class LowerWeightHostManager extends 
CommonHostManager {
             }
         }
 
-        public HostWeight getHostWeight(String addr, String workerGroup, 
String heartbeat) {
-            if (ResInfo.isValidHeartbeatForRegistryInfo(heartbeat)) {
-                String[] parts = heartbeat.split(Constants.COMMA);
-                int status = Integer.parseInt(parts[8]);
-                if (status == Constants.ABNORMAL_NODE_STATUS) {
-                    logger.warn("worker {} current cpu load average {} is too 
high or available memory {}G is too low",
-                            addr, Double.parseDouble(parts[2]), 
Double.parseDouble(parts[3]));
-                    return null;
-                }
-                double cpu = Double.parseDouble(parts[0]);
-                double memory = Double.parseDouble(parts[1]);
-                double loadAverage = Double.parseDouble(parts[2]);
-                long startTime = DateUtils.stringToDate(parts[6]).getTime();
-                int weight = getWorkerHostWeightFromHeartbeat(heartbeat);
-                return new HostWeight(HostWorker.of(addr, weight, 
workerGroup), cpu, memory, loadAverage, startTime);
+        public HostWeight getHostWeight(String addr, String workerGroup, 
String heartBeatInfo) {
+            HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+            if (heartBeat == null) {
+                return null;
+            }
+            if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) 
{
+                logger.warn("worker {} current cpu load average {} is too high 
or available memory {}G is too low",
+                        addr, heartBeat.getLoadAverage(), 
heartBeat.getAvailablePhysicalMemorySize());
+                return null;
+            }
+            if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
+                logger.warn("worker {} is busy, current waiting task count {} 
is large than worker thread count {}",
+                        addr, heartBeat.getWorkerWaitingTaskCount(), 
heartBeat.getWorkerExecThreadCount());
+                return null;
             }
-            return null;
+            return new HostWeight(HostWorker.of(addr, 
heartBeat.getWorkerHostWeight(), workerGroup),
+                    heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), 
heartBeat.getLoadAverage(), heartBeat.getStartupTime());
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 22de8e7..7bae6de 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -96,14 +95,14 @@ public class MasterRegistryClient {
     private ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceExecMaps;
 
     /**
-     * master start time
+     * master startup time, ms
      */
-    private String startTime;
+    private long startupTime;
 
     private String localNodePath;
 
     public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceExecMaps) {
-        this.startTime = DateUtils.dateToString(new Date());
+        this.startupTime = System.currentTimeMillis();
         this.registryClient = RegistryClient.getInstance();
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
         this.processInstanceExecMaps = processInstanceExecMaps;
@@ -364,14 +363,14 @@ public class MasterRegistryClient {
         String address = NetUtils.getAddr(masterConfig.getListenPort());
         localNodePath = getMasterPath();
         int masterHeartbeatInterval = 
masterConfig.getMasterHeartbeatInterval();
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
                 masterConfig.getMasterMaxCpuloadAvg(),
                 masterConfig.getMasterReservedMemory(),
                 Sets.newHashSet(getMasterPath()),
                 Constants.MASTER_TYPE,
                 registryClient);
 
-        registryClient.persistEphemeral(localNodePath, 
heartBeatTask.heartBeatInfo());
+        registryClient.persistEphemeral(localNodePath, 
heartBeatTask.getHeartBeatInfo());
         registryClient.addConnectionStateListener(new 
MasterRegistryConnectStateListener());
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("master node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, masterHeartbeatInterval);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index c807877..61e8c40 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -17,15 +17,12 @@
 
 package org.apache.dolphinscheduler.server.registry;
 
-import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
-import java.util.Date;
 import java.util.Set;
 
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,42 +33,43 @@ public class HeartBeatTask implements Runnable {
 
     private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
 
-    private String startTime;
-    private double maxCpuloadAvg;
-    private double reservedMemory;
-    private int hostWeight; // worker host weight
     private Set<String> heartBeatPaths;
-    private String serverType;
     private RegistryClient registryClient;
+    private WorkerManagerThread workerManagerThread;
+    private String serverType;
+    private HeartBeat heartBeat;
 
-    public HeartBeatTask(String startTime,
+    public HeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
                          double reservedMemory,
                          Set<String> heartBeatPaths,
                          String serverType,
                          RegistryClient registryClient) {
-        this.startTime = startTime;
-        this.maxCpuloadAvg = maxCpuloadAvg;
-        this.reservedMemory = reservedMemory;
         this.heartBeatPaths = heartBeatPaths;
-        this.serverType = serverType;
         this.registryClient = registryClient;
+        this.serverType = serverType;
+        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory);
     }
 
-    public HeartBeatTask(String startTime,
+    public HeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
                          double reservedMemory,
                          int hostWeight,
                          Set<String> heartBeatPaths,
                          String serverType,
-                         RegistryClient registryClient) {
-        this.startTime = startTime;
-        this.maxCpuloadAvg = maxCpuloadAvg;
-        this.reservedMemory = reservedMemory;
-        this.hostWeight = hostWeight;
+                         RegistryClient registryClient,
+                         int workerThreadCount,
+                         WorkerManagerThread workerManagerThread
+    ) {
         this.heartBeatPaths = heartBeatPaths;
-        this.serverType = serverType;
         this.registryClient = registryClient;
+        this.workerManagerThread = workerManagerThread;
+        this.serverType = serverType;
+        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory, hostWeight, workerThreadCount);
+    }
+
+    public String getHeartBeatInfo() {
+        return this.heartBeat.encodeHeartBeat();
     }
 
     @Override
@@ -85,41 +83,16 @@ public class HeartBeatTask implements Runnable {
                 }
             }
 
+            if (workerManagerThread != null) {
+                // update waiting task count
+                
heartBeat.setWorkerWaitingTaskCount(workerManagerThread.getThreadPoolQueueSize());
+            }
+
             for (String heartBeatPath : heartBeatPaths) {
-                registryClient.update(heartBeatPath, heartBeatInfo());
+                registryClient.update(heartBeatPath, 
heartBeat.encodeHeartBeat());
             }
         } catch (Throwable ex) {
             logger.error("error write heartbeat info", ex);
         }
     }
-
-    public String heartBeatInfo() {
-        double loadAverage = OSUtils.loadAverage();
-        double availablePhysicalMemorySize = 
OSUtils.availablePhysicalMemorySize();
-        int status = Constants.NORMAL_NODE_STATUS;
-        if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < 
reservedMemory) {
-            logger.warn("current cpu load average {} is too high or available 
memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
-                    loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, 
reservedMemory);
-            status = Constants.ABNORMAL_NODE_STATUS;
-        }
-
-        StringBuilder builder = new StringBuilder(100);
-        builder.append(OSUtils.cpuUsage()).append(COMMA);
-        builder.append(OSUtils.memoryUsage()).append(COMMA);
-        builder.append(OSUtils.loadAverage()).append(COMMA);
-        
builder.append(OSUtils.availablePhysicalMemorySize()).append(Constants.COMMA);
-        builder.append(maxCpuloadAvg).append(Constants.COMMA);
-        builder.append(reservedMemory).append(Constants.COMMA);
-        builder.append(startTime).append(Constants.COMMA);
-        builder.append(DateUtils.dateToString(new 
Date())).append(Constants.COMMA);
-        builder.append(status).append(COMMA);
-        // save process id
-        builder.append(OSUtils.getProcessID());
-        // worker host weight
-        if (Constants.WORKER_TYPE.equals(serverType)) {
-            builder.append(Constants.COMMA).append(hostWeight);
-        }
-        return builder.toString();
-    }
-
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 9705b44..7c03f22 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -81,12 +81,6 @@ public class WorkerServer implements IStoppable {
     private NettyRemotingServer nettyRemotingServer;
 
     /**
-     * worker registry
-     */
-    @Autowired
-    private WorkerRegistryClient workerRegistryClient;
-
-    /**
      * worker config
      */
     @Autowired
@@ -110,6 +104,12 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private WorkerManagerThread workerManagerThread;
 
+    /**
+     * worker registry
+     */
+    @Autowired
+    private WorkerRegistryClient workerRegistryClient;
+
     private TaskPluginManager taskPluginManager;
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index c3720fe..76d70a4 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -172,7 +172,7 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
 
         // submit task to manager
         if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, 
taskCallbackService, alertClientService, taskPluginManager))) {
-            logger.info("submit task to manager error, queue is full, queue 
size is {}", workerManager.getQueueSize());
+            logger.info("submit task to manager error, queue is full, queue 
size is {}", workerManager.getDelayQueueSize());
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 363b497..e8c6ad0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -24,16 +24,15 @@ import static 
org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.lang.StringUtils;
 
-import java.util.Date;
 import java.util.Set;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
@@ -64,6 +63,12 @@ public class WorkerRegistryClient {
     private WorkerConfig workerConfig;
 
     /**
+     * worker manager
+     */
+    @Autowired
+    private WorkerManagerThread workerManagerThread;
+
+    /**
      * heartbeat executor
      */
     private ScheduledExecutorService heartBeatExecutor;
@@ -71,16 +76,16 @@ public class WorkerRegistryClient {
     private RegistryClient registryClient;
 
     /**
-     * worker start time
+     * worker startup time, ms
      */
-    private String startTime;
+    private long startupTime;
 
     private Set<String> workerGroups;
 
     @PostConstruct
     public void initWorkRegistry() {
         this.workerGroups = workerConfig.getWorkerGroups();
-        this.startTime = DateUtils.dateToString(new Date());
+        this.startupTime = System.currentTimeMillis();
         this.registryClient = RegistryClient.getInstance();
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
     }
@@ -98,13 +103,16 @@ public class WorkerRegistryClient {
             logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
         }
 
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
                 workerConfig.getWorkerMaxCpuloadAvg(),
                 workerConfig.getWorkerReservedMemory(),
                 workerConfig.getHostWeight(),
                 workerZkPaths,
                 Constants.WORKER_TYPE,
-                registryClient);
+                registryClient,
+                workerConfig.getWorkerExecThreads(),
+                workerManagerThread
+        );
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("worker node : {} heartbeat interval {} s", address, 
workerHeartbeatInterval);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 8319e01..4f68166 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,15 +74,24 @@ public class WorkerManagerThread implements Runnable {
     }
 
     /**
-     * get queue size
+     * get delay queue size
      *
      * @return queue size
      */
-    public int getQueueSize() {
+    public int getDelayQueueSize() {
         return workerExecuteQueue.size();
     }
 
     /**
+     * get thread pool queue size
+     *
+     * @return queue size
+     */
+    public int getThreadPoolQueueSize() {
+        return ((ThreadPoolExecutor) workerExecService).getQueue().size();
+    }
+
+    /**
      * Kill tasks that have not been executed, like delay task
      * then send Response to Master, update the execution status of task 
instance
      */
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index c6b1eb8..210ce21 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -31,12 +31,14 @@ import static 
org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import org.apache.commons.lang.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -93,10 +95,17 @@ public class RegistryClient extends RegistryCenter {
 
         List<Server> serverList = new ArrayList<>();
         for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
-            Server server = 
ResInfo.parseHeartbeatForRegistryInfo(entry.getValue());
-            if (server == null) {
+            HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue());
+            if (heartBeat == null) {
                 continue;
             }
+
+            Server server = new Server();
+            server.setResInfo(JSONUtils.toJsonString(heartBeat));
+            server.setCreateTime(new Date(heartBeat.getStartupTime()));
+            server.setLastHeartbeatTime(new Date(heartBeat.getReportTime()));
+            server.setId(heartBeat.getProcessId());
+
             String key = entry.getKey();
             server.setZkDirectory(parentPath + "/" + key);
             // set host and port

Reply via email to