This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5856a12  [Improvement-4984][Worker] Refactor and Improve worker load 
balance (#4996)
5856a12 is described below

commit 5856a12855328e67aeb6a2005f86b3c1081750a1
Author: Shiwen Cheng <[email protected]>
AuthorDate: Thu Mar 18 10:03:58 2021 +0800

    [Improvement-4984][Worker] Refactor and Improve worker load balance (#4996)
    
    * [Improvement][Remote] Improve unit tests
    
    * [Improvement][Worker] Improve worker load balance
    
    * [Improvement][Worker] Fix code smells
    
    * [Improvement][Worker] Rename weight to hostWeight
---
 .../api/service/impl/WorkerGroupServiceImpl.java   |  13 +-
 .../api/service/WorkerGroupServiceTest.java        |  10 +-
 .../apache/dolphinscheduler/common/Constants.java  |   6 +-
 .../dolphinscheduler/common/utils/ResInfo.java     |  58 ++++++---
 .../apache/dolphinscheduler/remote/utils/Host.java | 143 ++++-----------------
 .../remote/NettyRemotingClientTest.java            |  13 +-
 .../{ => command/future}/ResponseFutureTest.java   |  12 +-
 .../log}/RemoveTaskLogRequestCommandTest.java      |  10 +-
 .../log}/RemoveTaskLogResponseCommandTest.java     |   9 +-
 .../dolphinscheduler/remote}/utils/HostTest.java   |  23 +---
 .../remote/{ => utils}/NettyUtilTest.java          |   5 +-
 .../master/dispatch/host/CommonHostManager.java    |  48 ++++---
 .../dispatch/host/LowerWeightHostManager.java      |  60 ++++-----
 .../master/dispatch/host/RandomHostManager.java    |  11 +-
 .../dispatch/host/RoundRobinHostManager.java       |   7 +-
 .../dispatch/host/assign/AbstractSelector.java     |   4 +-
 .../master/dispatch/host/assign/HostSelector.java  |   6 +-
 .../master/dispatch/host/assign/HostWeight.java    |  44 +++----
 .../master/dispatch/host/assign/HostWorker.java    |  77 +++++++++++
 .../dispatch/host/assign/RandomSelector.java       |  14 +-
 .../dispatch/host/assign/RoundRobinSelector.java   |  27 ++--
 .../server/master/registry/MasterRegistry.java     |   4 +-
 .../server/registry/HeartBeatTask.java             |  30 ++++-
 .../server/worker/config/WorkerConfig.java         |   6 +-
 .../server/worker/registry/WorkerRegistry.java     |  24 ++--
 .../dolphinscheduler/server/zk/ZKMasterClient.java |   2 -
 .../dispatch/host/assign/HostWorkerTest.java}      |  34 ++---
 .../host/assign/LowerWeightRoundRobinTest.java     |  22 ++--
 .../dispatch/host/assign/RandomSelectorTest.java   |  10 +-
 .../host/assign/RoundRobinSelectorTest.java        |  36 +++---
 .../server/master/registry/MasterRegistryTest.java |   3 +-
 .../service/zk/RegisterOperator.java               |   4 +-
 pom.xml                                            |  16 +--
 33 files changed, 388 insertions(+), 403 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 7ce6a74..e0282de 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -163,18 +163,13 @@ public class WorkerGroupServiceImpl extends 
BaseServiceImpl implements WorkerGro
             if (CollectionUtils.isEmpty(childrenNodes)) {
                 continue;
             }
-            String timeStamp = childrenNodes.get(0);
-            for (int i = 0; i < childrenNodes.size(); i++) {
-                childrenNodes.set(i, 
Host.of(childrenNodes.get(i)).getAddressAndWeight());
-            }
-
             WorkerGroup wg = new WorkerGroup();
             wg.setName(workerGroup);
             if (isPaging) {
-                wg.setIpList(childrenNodes);
-                String registeredIpValue = 
zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp);
-                
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
-                
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
+                wg.setIpList(childrenNodes.stream().map(node -> 
Host.of(node).getIp()).collect(Collectors.toList()));
+                String registeredValue = 
zookeeperCachedOperator.get(workerGroupPath + SLASH + childrenNodes.get(0));
+                
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(",")[6]));
+                
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(",")[7]));
             }
             workerGroups.add(wg);
         }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index bc95ad1..ce9f239 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -70,13 +70,13 @@ public class WorkerGroupServiceTest {
         workerGroupStrList.add("test");
         
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
 
-        List<String> defaultIpList = new ArrayList<>();
-        defaultIpList.add("192.168.220.188:1234:100:1234567");
-        defaultIpList.add("192.168.220.189:1234:100:1234567");
+        List<String> defaultAddressList = new ArrayList<>();
+        defaultAddressList.add("192.168.220.188:1234");
+        defaultAddressList.add("192.168.220.189:1234");
 
-        Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + 
"/default")).thenReturn(defaultIpList);
+        Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + 
"/default")).thenReturn(defaultAddressList);
 
-        Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" 
+ defaultIpList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 
11:17:59,2020-07-21 14:39:20,0,13238");
+        Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" 
+ 
defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 
11:17:59,2020-07-21 14:39:20,0,13238");
     }
 
     /**
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 1741257..d6d571f 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -385,6 +385,10 @@ public final class Constants {
      */
     public static final double DEFAULT_WORKER_RESERVED_MEMORY = 
OSUtils.totalMemorySize() / 10;
 
+    /**
+     * worker host weight
+     */
+    public static final int DEFAULT_WORKER_HOST_WEIGHT = 100;
 
     /**
      * default log cache rows num,output when reach the number
@@ -542,7 +546,7 @@ public final class Constants {
      * heartbeat for zk info length
      */
     public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
-
+    public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH = 
11;
 
     /**
      * jar
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
index 8a6ef1e..8f533a0 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.utils;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.model.Server;
 
@@ -38,10 +40,8 @@ public class ResInfo {
      */
     private double loadAverage;
 
-    public ResInfo(){}
-
-    public ResInfo(double cpuUsage , double memoryUsage){
-        this.cpuUsage = cpuUsage ;
+    public ResInfo(double cpuUsage, double memoryUsage) {
+        this.cpuUsage = cpuUsage;
         this.memoryUsage = memoryUsage;
     }
 
@@ -81,35 +81,53 @@ public class ResInfo {
      * @param loadAverage load average
      * @return cpu and memory usage
      */
-    public static String getResInfoJson(double cpuUsage , double 
memoryUsage,double loadAverage){
+    public static String getResInfoJson(double cpuUsage, double memoryUsage, 
double loadAverage) {
         ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage);
         return JSONUtils.toJsonString(resInfo);
     }
 
-
     /**
      * parse heartbeat info for zk
      * @param heartBeatInfo heartbeat info
      * @return heartbeat info to Server
      */
-    public static Server parseHeartbeatForZKInfo(String heartBeatInfo){
-        if (StringUtils.isEmpty(heartBeatInfo)) {
+    public static Server parseHeartbeatForZKInfo(String heartBeatInfo) {
+        if (!isValidHeartbeatForZKInfo(heartBeatInfo)) {
             return null;
         }
-        String[] masterArray = heartBeatInfo.split(Constants.COMMA);
-        if(masterArray.length != 
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
-            return null;
+        String[] parts = heartBeatInfo.split(Constants.COMMA);
+        Server server = new Server();
+        server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]),
+                Double.parseDouble(parts[1]),
+                Double.parseDouble(parts[2])));
+        server.setCreateTime(DateUtils.stringToDate(parts[6]));
+        server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7]));
+        //set process id
+        server.setId(Integer.parseInt(parts[9]));
+        return server;
+    }
 
+    /**
+     * is valid heartbeat info for zk
+     * @param heartBeatInfo heartbeat info
+     * @return heartbeat info is valid
+     */
+    public static boolean isValidHeartbeatForZKInfo(String heartBeatInfo) {
+        if (StringUtils.isNotEmpty(heartBeatInfo)) {
+            String[] parts = heartBeatInfo.split(Constants.COMMA);
+            return parts.length == 
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH
+                    || parts.length == 
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
         }
-        Server masterServer = new Server();
-        
masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]),
-                Double.parseDouble(masterArray[1]),
-                Double.parseDouble(masterArray[2])));
-        masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6]));
-        
masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7]));
-        //set process id
-        masterServer.setId(Integer.parseInt(masterArray[9]));
-        return masterServer;
+        return false;
+    }
+
+    /**
+     * is new heartbeat info for zk with weight
+     * @param parts heartbeat info parts
+     * @return heartbeat info is new with weight
+     */
+    public static boolean isNewHeartbeatWithWeight(String[] parts) {
+        return parts.length == 
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
     }
 
 }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 7e42984..359baef 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.remote.utils;
 import static org.apache.dolphinscheduler.common.Constants.COLON;
 
 import java.io.Serializable;
-import java.util.Objects;
-import java.util.StringJoiner;
 
 /**
  * server address
@@ -43,21 +41,6 @@ public class Host implements Serializable {
      */
     private int port;
 
-    /**
-     * weight
-     */
-    private int weight;
-
-    /**
-     * startTime
-     */
-    private long startTime;
-
-    /**
-     * workGroup
-     */
-    private String workGroup;
-
     public Host() {
     }
 
@@ -67,21 +50,11 @@ public class Host implements Serializable {
         this.address = ip + COLON + port;
     }
 
-    public Host(String ip, int port, int weight, long startTime) {
-        this.ip = ip;
-        this.port = port;
-        this.address = ip + COLON + port;
-        this.weight = getWarmUpWeight(weight, startTime);
-        this.startTime = startTime;
-    }
-
-    public Host(String ip, int port, int weight, long startTime, String 
workGroup) {
-        this.ip = ip;
-        this.port = port;
-        this.address = ip + COLON + port;
-        this.weight = getWarmUpWeight(weight, startTime);
-        this.workGroup = workGroup;
-        this.startTime = startTime;
+    public Host(String address) {
+        String[] parts = splitAddress(address);
+        this.ip = parts[0];
+        this.port = Integer.parseInt(parts[1]);
+        this.address = address;
     }
 
     public String getAddress() {
@@ -89,6 +62,9 @@ public class Host implements Serializable {
     }
 
     public void setAddress(String address) {
+        String[] parts = splitAddress(address);
+        this.ip = parts[0];
+        this.port = Integer.parseInt(parts[1]);
         this.address = address;
     }
 
@@ -101,22 +77,6 @@ public class Host implements Serializable {
         this.address = ip + COLON + port;
     }
 
-    public int getWeight() {
-        return weight;
-    }
-
-    public void setWeight(int weight) {
-        this.weight = weight;
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-    }
-
     public int getPort() {
         return port;
     }
@@ -126,12 +86,15 @@ public class Host implements Serializable {
         this.address = ip + COLON + port;
     }
 
-    public String getWorkGroup() {
-        return workGroup;
-    }
-
-    public void setWorkGroup(String workGroup) {
-        this.workGroup = workGroup;
+    /**
+     * address convert host
+     *
+     * @param address address
+     * @return host
+     */
+    public static Host of(String address) {
+        String[] parts = splitAddress(address);
+        return new Host(parts[0], Integer.parseInt(parts[1]));
     }
 
     /**
@@ -140,37 +103,15 @@ public class Host implements Serializable {
      * @param address address
      * @return host
      */
-    public static Host of(String address) {
+    public static String[] splitAddress(String address) {
         if (address == null) {
             throw new IllegalArgumentException("Host : address is null.");
         }
         String[] parts = address.split(COLON);
-        if (parts.length < 2) {
+        if (parts.length != 2) {
             throw new IllegalArgumentException(String.format("Host : %s 
illegal.", address));
         }
-        Host host = null;
-        if (parts.length == 2) {
-            host = new Host(parts[0], Integer.parseInt(parts[1]));
-        }
-        if (parts.length == 4) {
-            host = new Host(parts[0], Integer.parseInt(parts[1]), 
Integer.parseInt(parts[2]), Long.parseLong(parts[3]));
-        }
-        return host;
-    }
-
-    /**
-     * generate host string
-     * @param address address
-     * @param weight weight
-     * @param startTime startTime
-     * @return address:weight:startTime
-     */
-    public static String generate(String address, int weight, long startTime) {
-        StringJoiner stringJoiner = new StringJoiner(COLON);
-        stringJoiner.add(address)
-                .add(String.valueOf(weight))
-                .add(String.valueOf(startTime));
-        return stringJoiner.toString();
+        return parts;
     }
 
     /**
@@ -181,54 +122,16 @@ public class Host implements Serializable {
      */
     public static Boolean isOldVersion(String address) {
         String[] parts = address.split(COLON);
-        return parts.length != 2 && parts.length != 3;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Host host = (Host) o;
-        return Objects.equals(getAddress(), host.getAddress());
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(getAddress());
+        return parts.length != 2;
     }
 
     @Override
     public String toString() {
         return "Host{"
                 + "address='" + address + '\''
-                + ", weight=" + weight
-                + ", startTime=" + startTime
-                + ", workGroup='" + workGroup + '\''
+                + ", ip='" + ip + '\''
+                + ", port=" + port
                 + '}';
     }
 
-    /**
-     * warm up
-     */
-    private int getWarmUpWeight(int weight, long startTime) {
-        long uptime = System.currentTimeMillis() - startTime;
-        //If the warm-up is not over, reduce the weight
-        if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
-            return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME));
-        }
-        return weight;
-    }
-
-    /**
-     * get address and weight
-     *
-     * @return address:weight
-     */
-    public String getAddressAndWeight() {
-        return address + COLON + weight;
-    }
 }
diff --git 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
index cfc10b2..a3f6c7b 100644
--- 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.remote;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.Ping;
@@ -28,23 +27,25 @@ import 
org.apache.dolphinscheduler.remote.future.InvokeCallback;
 import org.apache.dolphinscheduler.remote.future.ResponseFuture;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.junit.Assert;
-import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.netty.channel.Channel;
+
 /**
  *  netty remote client test
  */
 public class NettyRemotingClientTest {
 
-
     /**
-     *  test sned sync
+     *  test send sync
      */
     @Test
-    public void testSendSync(){
+    public void testSendSync() {
         NettyServerConfig serverConfig = new NettyServerConfig();
 
         NettyRemotingServer server = new NettyRemotingServer(serverConfig);
diff --git 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java
similarity index 93%
rename from 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java
rename to 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java
index 8836043..0190c99 100644
--- 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.remote;
-
+package org.apache.dolphinscheduler.remote.command.future;
 
 import org.apache.dolphinscheduler.remote.future.InvokeCallback;
 import org.apache.dolphinscheduler.remote.future.ResponseFuture;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.junit.Assert;
-import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.Assert;
+import org.junit.Test;
+
 public class ResponseFutureTest {
 
     @Test
-    public void testScanFutureTable(){
+    public void testScanFutureTable() {
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("executor-service"));
         executorService.scheduleAtFixedRate(new Runnable() {
             @Override
@@ -51,7 +51,7 @@ public class ResponseFutureTest {
         ResponseFuture future = new ResponseFuture(1, 2000, invokeCallback, 
null);
         try {
             latch.await(5000, TimeUnit.MILLISECONDS);
-            Assert.assertTrue(ResponseFuture.getFuture(1) == null);
+            Assert.assertNull(ResponseFuture.getFuture(1));
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
diff --git 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java
similarity index 83%
rename from 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java
rename to 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java
index 37c2106..10646f9 100644
--- 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.remote;
+package org.apache.dolphinscheduler.remote.command.log;
 
-import junit.framework.Assert;
 import org.apache.dolphinscheduler.remote.command.Command;
-import 
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
-import 
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
+
 import org.junit.Test;
 
+import junit.framework.Assert;
+
 public class RemoveTaskLogRequestCommandTest {
 
     @Test
-    public void testConvert2Command(){
+    public void testConvert2Command() {
         RemoveTaskLogResponseCommand removeTaskLogResponseCommand = new 
RemoveTaskLogResponseCommand();
         removeTaskLogResponseCommand.setStatus(true);
         Command command = removeTaskLogResponseCommand.convert2Command(122);
diff --git 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java
similarity index 88%
rename from 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java
rename to 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java
index aab0ad3..87dd700 100644
--- 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.remote;
+package org.apache.dolphinscheduler.remote.command.log;
 
-import junit.framework.Assert;
 import org.apache.dolphinscheduler.remote.command.Command;
-import 
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
+
 import org.junit.Test;
 
+import junit.framework.Assert;
+
 public class RemoveTaskLogResponseCommandTest {
 
     @Test
-    public void testConvert2Command(){
+    public void testConvert2Command() {
         RemoveTaskLogRequestCommand removeTaskLogRequestCommand = new 
RemoveTaskLogRequestCommand();
         removeTaskLogRequestCommand.setPath("/opt/zhangsan");
         Command command = removeTaskLogRequestCommand.convert2Command();
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java
similarity index 56%
rename from 
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
rename to 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java
index 80ff11e..a9da007 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.utils;
-
-import org.apache.dolphinscheduler.remote.utils.Host;
+package org.apache.dolphinscheduler.remote.utils;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -28,25 +26,12 @@ import org.junit.Test;
 public class HostTest {
 
     @Test
-    public void testHostWarmUp() {
-        Host host = Host.of(("192.158.2.2:22:100:" + 
(System.currentTimeMillis() - 60 * 5 * 1000)));
-        Assert.assertEquals(50, host.getWeight());
-        host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 
60 * 10 * 1000)));
-        Assert.assertEquals(100, host.getWeight());
-    }
-
-    @Test
     public void testHost() {
         Host host = Host.of("192.158.2.2:22");
         Assert.assertEquals(22, host.getPort());
+        host.setAddress("127.0.0.1:8888");
+        Assert.assertEquals("127.0.0.1", host.getIp());
+        Assert.assertEquals(8888, host.getPort());
     }
 
-    @Test
-    public void testGenerate() {
-        String address = "192.158.2.2:22";
-        int weight = 100;
-        long startTime = System.currentTimeMillis();
-        String generateHost = Host.generate(address, weight, startTime);
-        Assert.assertEquals(address + ":" + weight + ":" + startTime, 
generateHost);
-    }
 }
diff --git 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java
similarity index 92%
rename from 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java
rename to 
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java
index e95dbdd..a3e1385 100644
--- 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java
@@ -15,12 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.remote;
+package org.apache.dolphinscheduler.remote.utils;
 
 import static org.apache.dolphinscheduler.remote.utils.Constants.OS_NAME;
 
-import org.apache.dolphinscheduler.remote.utils.NettyUtils;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,7 +29,6 @@ import io.netty.channel.epoll.Epoll;
  */
 public class NettyUtilTest {
 
-
     @Test
     public void testUserEpoll() {
         if (OS_NAME.toLowerCase().contains("linux") && Epoll.isAvailable()) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 4a3d4bd..7184b9e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -17,26 +17,32 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.springframework.beans.factory.annotation.Autowired;
 
 /**
  *  round robin host manager
  */
 public abstract class CommonHostManager implements HostManager {
 
-    private final Logger logger = 
LoggerFactory.getLogger(CommonHostManager.class);
+    /**
+     * zookeeper registry center
+     */
+    @Autowired
+    protected ZookeeperRegistryCenter registryCenter;
 
     /**
      * zookeeperNodeManager
@@ -50,16 +56,15 @@ public abstract class CommonHostManager implements 
HostManager {
      * @return host
      */
     @Override
-    public Host select(ExecutionContext context){
+    public Host select(ExecutionContext context) {
         Host host = new Host();
         Collection<String> nodes = null;
-        /**
-         * executor type
-         */
+        String workerGroup = context.getWorkerGroup();
+        // executor type
         ExecutorType executorType = context.getExecutorType();
-        switch (executorType){
+        switch (executorType) {
             case WORKER:
-                nodes = 
zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
+                nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup);
                 break;
             case CLIENT:
                 break;
@@ -67,21 +72,26 @@ public abstract class CommonHostManager implements 
HostManager {
                 throw new IllegalArgumentException("invalid executorType : " + 
executorType);
 
         }
-        if(CollectionUtils.isEmpty(nodes)){
+        if (nodes == null || nodes.isEmpty()) {
             return host;
         }
-        List<Host> candidateHosts = new ArrayList<>(nodes.size());
+        List<HostWorker> candidateHosts = new ArrayList<>();
         nodes.forEach(node -> {
-            Host nodeHost=Host.of(node);
-            nodeHost.setWorkGroup(context.getWorkerGroup());
-            candidateHosts.add(nodeHost);
+            String workerGroupPath = 
registryCenter.getWorkerGroupPath(workerGroup);
+            String heartbeat = 
registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
+            int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
+            if (StringUtils.isNotEmpty(heartbeat)) {
+                String[] parts = heartbeat.split(Constants.COMMA);
+                if (ResInfo.isNewHeartbeatWithWeight(parts)) {
+                    hostWeight = Integer.parseInt(parts[10]);
+                }
+            }
+            candidateHosts.add(HostWorker.of(node, hostWeight, workerGroup));
         });
-
-
         return select(candidateHosts);
     }
 
-    protected abstract Host select(Collection<Host> nodes);
+    protected abstract HostWorker select(Collection<HostWorker> nodes);
 
     public void setZookeeperNodeManager(ZookeeperNodeManager 
zookeeperNodeManager) {
         this.zookeeperNodeManager = zookeeperNodeManager;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index ac7d8b0..ef249a0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -19,20 +19,20 @@ package 
org.apache.dolphinscheduler.server.master.dispatch.host;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.ResInfo;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
+import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -40,8 +40,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *  round robin host manager
@@ -51,12 +54,6 @@ public class LowerWeightHostManager extends 
CommonHostManager {
     private final Logger logger = 
LoggerFactory.getLogger(LowerWeightHostManager.class);
 
     /**
-     * zookeeper registry center
-     */
-    @Autowired
-    private ZookeeperRegistryCenter registryCenter;
-
-    /**
      * round robin host manager
      */
     private RoundRobinHostManager roundRobinHostManager;
@@ -82,7 +79,7 @@ public class LowerWeightHostManager extends CommonHostManager 
{
     private ScheduledExecutorService executorService;
 
     @PostConstruct
-    public void init(){
+    public void init() {
         this.selector = new LowerWeightRoundRobin();
         this.workerHostWeightsMap = new ConcurrentHashMap<>();
         this.lock = new ReentrantLock();
@@ -103,20 +100,20 @@ public class LowerWeightHostManager extends 
CommonHostManager {
      * @return host
      */
     @Override
-    public Host select(ExecutionContext context){
+    public Host select(ExecutionContext context) {
         Set<HostWeight> workerHostWeights = 
getWorkerHostWeights(context.getWorkerGroup());
-        if(CollectionUtils.isNotEmpty(workerHostWeights)){
+        if (CollectionUtils.isNotEmpty(workerHostWeights)) {
             return selector.select(workerHostWeights).getHost();
         }
         return new Host();
     }
 
     @Override
-    public Host select(Collection<Host> nodes) {
+    public HostWorker select(Collection<HostWorker> nodes) {
         throw new UnsupportedOperationException("not support");
     }
 
-    private void syncWorkerHostWeight(Map<String, Set<HostWeight>> 
workerHostWeights){
+    private void syncWorkerHostWeight(Map<String, Set<HostWeight>> 
workerHostWeights) {
         lock.lock();
         try {
             workerHostWeightsMap.clear();
@@ -126,7 +123,7 @@ public class LowerWeightHostManager extends 
CommonHostManager {
         }
     }
 
-    private Set<HostWeight> getWorkerHostWeights(String workerGroup){
+    private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
         lock.lock();
         try {
             return workerHostWeightsMap.get(workerGroup);
@@ -135,7 +132,7 @@ public class LowerWeightHostManager extends 
CommonHostManager {
         }
     }
 
-    class RefreshResourceTask implements Runnable{
+    class RefreshResourceTask implements Runnable {
 
         @Override
         public void run() {
@@ -143,35 +140,34 @@ public class LowerWeightHostManager extends 
CommonHostManager {
                 Map<String, Set<String>> workerGroupNodes = 
zookeeperNodeManager.getWorkerGroupNodes();
                 Set<Map.Entry<String, Set<String>>> entries = 
workerGroupNodes.entrySet();
                 Map<String, Set<HostWeight>> workerHostWeights = new 
HashMap<>();
-                for(Map.Entry<String, Set<String>> entry : entries){
+                for (Map.Entry<String, Set<String>> entry : entries) {
                     String workerGroup = entry.getKey();
                     Set<String> nodes = entry.getValue();
                     String workerGroupPath = 
registryCenter.getWorkerGroupPath(workerGroup);
                     Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
-                    for(String node : nodes){
+                    for (String node : nodes) {
                         String heartbeat = 
registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
-                        if(StringUtils.isNotEmpty(heartbeat)
-                                && heartbeat.split(COMMA).length == 
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
-                            String[] parts = heartbeat.split(COMMA);
-
+                        if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) {
+                            String[] parts = heartbeat.split(Constants.COMMA);
                             int status = Integer.parseInt(parts[8]);
-                            if (status == Constants.ABNORMAL_NODE_STATUS){
+                            if (status == Constants.ABNORMAL_NODE_STATUS) {
                                 logger.warn("load is too high or 
availablePhysicalMemorySize(G) is too low, it's 
availablePhysicalMemorySize(G):{},loadAvg:{}",
                                         Double.parseDouble(parts[3]) , 
Double.parseDouble(parts[2]));
                                 continue;
                             }
-
                             double cpu = Double.parseDouble(parts[0]);
                             double memory = Double.parseDouble(parts[1]);
                             double loadAverage = Double.parseDouble(parts[2]);
-                            HostWeight hostWeight = new 
HostWeight(Host.of(node), cpu, memory, loadAverage);
+                            long startTime = 
DateUtils.stringToDate(parts[6]).getTime();
+                            int weight = 
ResInfo.isNewHeartbeatWithWeight(parts) ? Integer.parseInt(parts[10]) : 
Constants.DEFAULT_WORKER_HOST_WEIGHT;
+                            HostWeight hostWeight = new 
HostWeight(HostWorker.of(node, weight, workerGroup), cpu, memory, loadAverage, 
startTime);
                             hostWeights.add(hostWeight);
                         }
                     }
                     workerHostWeights.put(workerGroup, hostWeights);
                 }
                 syncWorkerHostWeight(workerHostWeights);
-            } catch (Throwable ex){
+            } catch (Throwable ex) {
                 logger.error("RefreshResourceTask error", ex);
             }
         }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
index 241906a..ad4f4aa 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
@@ -17,13 +17,11 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
+import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
 
 import java.util.Collection;
 
-
 /**
  *  round robin host manager
  */
@@ -32,17 +30,18 @@ public class RandomHostManager extends CommonHostManager {
     /**
      * selector
      */
-    private final Selector<Host> selector;
+    private final RandomSelector selector;
 
     /**
      * set round robin
      */
-    public RandomHostManager(){
+    public RandomHostManager() {
         this.selector = new RandomSelector();
     }
 
     @Override
-    public Host select(Collection<Host> nodes) {
+    public HostWorker select(Collection<HostWorker> nodes) {
         return selector.select(nodes);
     }
+
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index ec1945e..c6b1734 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -17,9 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
+import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
 
 import java.util.Collection;
 
@@ -32,7 +31,7 @@ public class RoundRobinHostManager extends CommonHostManager {
     /**
      * selector
      */
-    private final Selector<Host> selector;
+    private final RoundRobinSelector selector;
 
     /**
      * set round robin
@@ -42,7 +41,7 @@ public class RoundRobinHostManager extends CommonHostManager {
     }
 
     @Override
-    public Host select(Collection<Host> nodes) {
+    public HostWorker select(Collection<HostWorker> nodes) {
         return selector.select(nodes);
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
index 8560da9..087a5ff 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 /**
  *  AbstractSelector
  */
-public  abstract class AbstractSelector<T> implements Selector<T>{
+public  abstract class AbstractSelector<T> implements Selector<T> {
     @Override
     public T select(Collection<T> source) {
 
@@ -40,6 +40,6 @@ public  abstract class AbstractSelector<T> implements 
Selector<T>{
         return doSelect(source);
     }
 
-    protected abstract T  doSelect(Collection<T> source);
+    protected abstract T doSelect(Collection<T> source);
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
index 145393e..0be87e2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
@@ -28,9 +28,9 @@ public enum HostSelector {
 
     LOWERWEIGHT;
 
-    public static HostSelector of(String selector){
-        for(HostSelector hs : values()){
-            if(hs.name().equalsIgnoreCase(selector)){
+    public static HostSelector of(String selector) {
+        for (HostSelector hs : values()) {
+            if (hs.name().equalsIgnoreCase(selector)) {
                 return hs;
             }
         }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
index 839ebc8..9d7855f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -31,57 +31,55 @@ public class HostWeight {
 
     private final int LOAD_AVERAGE_FACTOR = 70;
 
-    private final Host host;
+    private final HostWorker hostWorker;
 
     private final double weight;
 
     private double currentWeight;
 
-    public HostWeight(Host host, double cpu, double memory, double 
loadAverage) {
-        this.weight = getWeight(cpu, memory, loadAverage, host);
-        this.host = host;
-        this.currentWeight = weight;
-    }
-
-    public double getCurrentWeight() {
-        return currentWeight;
+    public HostWeight(HostWorker hostWorker, double cpu, double memory, double 
loadAverage, long startTime) {
+        this.hostWorker = hostWorker;
+        this.weight = calculateWeight(cpu, memory, loadAverage, startTime);
+        this.currentWeight = this.weight;
     }
 
     public double getWeight() {
         return weight;
     }
 
+    public double getCurrentWeight() {
+        return currentWeight;
+    }
+
     public void setCurrentWeight(double currentWeight) {
         this.currentWeight = currentWeight;
     }
 
+    public HostWorker getHostWorker() {
+        return hostWorker;
+    }
+
     public Host getHost() {
-        return host;
+        return (Host)hostWorker;
     }
 
     @Override
     public String toString() {
         return "HostWeight{"
-            + "host=" + host
+            + "hostWorker=" + hostWorker
             + ", weight=" + weight
             + ", currentWeight=" + currentWeight
             + '}';
     }
 
-    private double getWeight(double cpu, double memory, double loadAverage, 
Host host) {
-        double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + 
loadAverage * LOAD_AVERAGE_FACTOR;
-        return getWarmUpWeight(host, calculateWeight);
-    }
-
-    /**
-     * If the warm-up is not over, add the weight
-     */
-    private double getWarmUpWeight(Host host, double weight) {
-        long startTime = host.getStartTime();
+    private double calculateWeight(double cpu, double memory, double 
loadAverage, long startTime) {
+        double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + 
loadAverage * LOAD_AVERAGE_FACTOR;
         long uptime = System.currentTimeMillis() - startTime;
         if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
-            return weight * Constants.WARM_UP_TIME / uptime;
+            // If the warm-up is not over, add the weight
+            return calculatedWeight * Constants.WARM_UP_TIME / uptime;
         }
-        return weight;
+        return calculatedWeight;
     }
+
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java
new file mode 100644
index 0000000..6515464
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+/**
+ * host worker
+ */
+public class HostWorker extends Host {
+
+    /**
+     * host weight
+     */
+    private int hostWeight;
+
+    /**
+     * worker group
+     */
+    private String workerGroup;
+
+    public HostWorker(String ip, int port, int hostWeight, String workerGroup) 
{
+        super(ip, port);
+        this.hostWeight = hostWeight;
+        this.workerGroup = workerGroup;
+    }
+
+    public HostWorker(String address, int hostWeight, String workerGroup) {
+        super(address);
+        this.hostWeight = hostWeight;
+        this.workerGroup = workerGroup;
+    }
+
+    public int getHostWeight() {
+        return hostWeight;
+    }
+
+    public void setHostWeight(int hostWeight) {
+        this.hostWeight = hostWeight;
+    }
+
+    public String getWorkerGroup() {
+        return workerGroup;
+    }
+
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
+    }
+
+    public static HostWorker of(String address, int hostWeight, String 
workerGroup) {
+        return new HostWorker(address, hostWeight, workerGroup);
+    }
+
+    @Override
+    public String toString() {
+        return "Host{"
+                + "hostWeight=" + hostWeight
+                + ", workerGroup='" + workerGroup + '\''
+                + '}';
+    }
+
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index 6975127..2b7488a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -27,20 +25,20 @@ import java.util.concurrent.ThreadLocalRandom;
 /**
  * random selector
  */
-public class RandomSelector extends AbstractSelector<Host> {
+public class RandomSelector extends AbstractSelector<HostWorker> {
 
     @Override
-    public Host doSelect(final Collection<Host> source) {
+    public HostWorker doSelect(final Collection<HostWorker> source) {
 
-        List<Host> hosts = new ArrayList<>(source);
+        List<HostWorker> hosts = new ArrayList<>(source);
         int size = hosts.size();
         int[] weights = new int[size];
         int totalWeight = 0;
         int index = 0;
 
-        for (Host host : hosts) {
-            totalWeight += host.getWeight();
-            weights[index] = host.getWeight();
+        for (HostWorker host : hosts) {
+            totalWeight += host.getHostWeight();
+            weights[index] = host.getHostWeight();
             index++;
         }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index 34a79ac..a5d93a6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -16,20 +16,21 @@
  */
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.springframework.stereotype.Service;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.springframework.stereotype.Service;
+
 /**
  * Smooth Weight Round Robin
  */
 @Service
-public class RoundRobinSelector extends AbstractSelector<Host> {
+public class RoundRobinSelector extends AbstractSelector<HostWorker> {
 
     private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> 
workGroupWeightMap = new ConcurrentHashMap<>();
 
@@ -69,12 +70,11 @@ public class RoundRobinSelector extends 
AbstractSelector<Host> {
 
     }
 
-
     @Override
-    public Host doSelect(Collection<Host> source) {
+    public HostWorker doSelect(Collection<HostWorker> source) {
 
-        List<Host> hosts = new ArrayList<>(source);
-        String key = hosts.get(0).getWorkGroup();
+        List<HostWorker> hosts = new ArrayList<>(source);
+        String key = hosts.get(0).getWorkerGroup();
         ConcurrentMap<String, WeightedRoundRobin> map = 
workGroupWeightMap.get(key);
         if (map == null) {
             workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
@@ -84,13 +84,13 @@ public class RoundRobinSelector extends 
AbstractSelector<Host> {
         int totalWeight = 0;
         long maxCurrent = Long.MIN_VALUE;
         long now = System.currentTimeMillis();
-        Host selectedHost = null;
+        HostWorker selectedHost = null;
         WeightedRoundRobin selectWeightRoundRobin = null;
 
-        for (Host host : hosts) {
-            String workGroupHost = host.getWorkGroup() + host.getAddress();
+        for (HostWorker host : hosts) {
+            String workGroupHost = host.getWorkerGroup() + host.getAddress();
             WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
-            int weight = host.getWeight();
+            int weight = host.getHostWeight();
             if (weight < 0) {
                 weight = 0;
             }
@@ -117,7 +117,6 @@ public class RoundRobinSelector extends 
AbstractSelector<Host> {
             totalWeight += weight;
         }
 
-
         if (!updateLock.get() && hosts.size() != map.size() && 
updateLock.compareAndSet(false, true)) {
             try {
                 ConcurrentMap<String, WeightedRoundRobin> newMap = new 
ConcurrentHashMap<>(map);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index e54fc84..f77f00d 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -98,8 +98,8 @@ public class MasterRegistry {
             });
         int masterHeartbeatInterval = 
masterConfig.getMasterHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
-                masterConfig.getMasterReservedMemory(),
                 masterConfig.getMasterMaxCpuloadAvg(),
+                masterConfig.getMasterReservedMemory(),
                 Sets.newHashSet(getMasterPath()),
                 Constants.MASTER_PREFIX,
                 zookeeperRegistryCenter);
@@ -132,9 +132,7 @@ public class MasterRegistry {
      * get local address
      */
     private String getLocalAddress() {
-
         return NetUtils.getAddr(masterConfig.getListenPort());
-
     }
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index d5f7a6e..e4c3d6a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -38,8 +38,9 @@ public class HeartBeatTask implements Runnable {
     private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
 
     private String startTime;
-    private double reservedMemory;
     private double maxCpuloadAvg;
+    private double reservedMemory;
+    private int hostWeight; // worker host weight
     private Set<String> heartBeatPaths;
     private String serverType;
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
@@ -48,23 +49,38 @@ public class HeartBeatTask implements Runnable {
     protected IStoppable stoppable = null;
 
     public HeartBeatTask(String startTime,
-                         double reservedMemory,
                          double maxCpuloadAvg,
+                         double reservedMemory,
                          Set<String> heartBeatPaths,
                          String serverType,
                          ZookeeperRegistryCenter zookeeperRegistryCenter) {
         this.startTime = startTime;
-        this.reservedMemory = reservedMemory;
         this.maxCpuloadAvg = maxCpuloadAvg;
+        this.reservedMemory = reservedMemory;
         this.heartBeatPaths = heartBeatPaths;
+        this.serverType = serverType;
         this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+    }
+
+    public HeartBeatTask(String startTime,
+                         double maxCpuloadAvg,
+                         double reservedMemory,
+                         int hostWeight,
+                         Set<String> heartBeatPaths,
+                         String serverType,
+                         ZookeeperRegistryCenter zookeeperRegistryCenter) {
+        this.startTime = startTime;
+        this.maxCpuloadAvg = maxCpuloadAvg;
+        this.reservedMemory = reservedMemory;
+        this.hostWeight = hostWeight;
+        this.heartBeatPaths = heartBeatPaths;
         this.serverType = serverType;
+        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
     }
 
     @Override
     public void run() {
         try {
-
             // check dead or not in zookeeper
             for (String heartBeatPath : heartBeatPaths) {
                 if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, 
serverType)) {
@@ -94,8 +110,12 @@ public class HeartBeatTask implements Runnable {
             builder.append(startTime).append(Constants.COMMA);
             builder.append(DateUtils.dateToString(new 
Date())).append(Constants.COMMA);
             builder.append(status).append(COMMA);
-            //save process id
+            // save process id
             builder.append(OSUtils.getProcessID());
+            // worker host weight
+            if (Constants.WORKER_PREFIX.equals(serverType)) {
+                builder.append(Constants.COMMA).append(hostWeight);
+            }
 
             for (String heartBeatPath : heartBeatPaths) {
                 
zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, 
builder.toString());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 228f6ab..0bd84f5 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -47,7 +47,7 @@ public class WorkerConfig {
     @Value("#{'${worker.groups:default}'.split(',')}")
     private Set<String> workerGroups;
 
-    @Value("${worker.listen.port: 1234}")
+    @Value("${worker.listen.port:1234}")
     private int listenPort;
 
     @Value("${worker.host.weight:100}")
@@ -119,8 +119,8 @@ public class WorkerConfig {
         return hostWeight;
     }
 
-    public void setHostWeight(int weight) {
-        this.hostWeight = weight;
+    public void setHostWeight(int hostWeight) {
+        this.hostWeight = hostWeight;
     }
 
     public String getAlertListenHost() {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 06b72a5..994fb58 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -34,6 +33,7 @@ import org.apache.curator.framework.state.ConnectionState;
 
 import java.util.Date;
 import java.util.Set;
+import java.util.StringJoiner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -120,12 +120,13 @@ public class WorkerRegistry {
             logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
         }
 
-        HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
-                this.workerConfig.getWorkerReservedMemory(),
-                this.workerConfig.getWorkerMaxCpuloadAvg(),
+        HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+                workerConfig.getWorkerMaxCpuloadAvg(),
+                workerConfig.getWorkerReservedMemory(),
+                workerConfig.getHostWeight(),
                 workerZkPaths,
                 Constants.WORKER_PREFIX,
-                this.zookeeperRegistryCenter);
+                zookeeperRegistryCenter);
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("worker node : {} heartbeat interval {} s", address, 
workerHeartbeatInterval);
@@ -150,22 +151,19 @@ public class WorkerRegistry {
      */
     public Set<String> getWorkerZkPaths() {
         Set<String> workerZkPaths = Sets.newHashSet();
-
         String address = getLocalAddress();
         String workerZkPathPrefix = 
this.zookeeperRegistryCenter.getWorkerPath();
-        int weight = workerConfig.getHostWeight();
-        long workerStartTime = System.currentTimeMillis();
 
         for (String workGroup : this.workerGroups) {
-            StringBuilder workerZkPathBuilder = new StringBuilder(100);
-            workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH);
+            StringJoiner workerZkPathJoiner = new StringJoiner(SLASH);
+            workerZkPathJoiner.add(workerZkPathPrefix);
             if (StringUtils.isEmpty(workGroup)) {
                 workGroup = DEFAULT_WORKER_GROUP;
             }
             // trim and lower case is need
-            
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
-            workerZkPathBuilder.append(Host.generate(address, weight, 
workerStartTime));
-            workerZkPaths.add(workerZkPathBuilder.toString());
+            workerZkPathJoiner.add(workGroup.trim().toLowerCase());
+            workerZkPathJoiner.add(address);
+            workerZkPaths.add(workerZkPathJoiner.toString());
         }
         return workerZkPaths;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index e813cdf..0f30a5c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.MasterServer;
@@ -308,7 +307,6 @@ public class ZKMasterClient extends AbstractZKClient {
      * @throws Exception exception
      */
     private void failoverWorker(String workerHost, boolean 
needCheckWorkerAlive) throws Exception {
-        workerHost = Host.of(workerHost).getAddress();
         logger.info("start worker[{}] failover ...", workerHost);
         List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
         for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java
similarity index 51%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
copy to 
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java
index 145393e..11f007b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java
@@ -17,23 +17,27 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-/**
- * host selector
- */
-public enum HostSelector {
-
-    RANDOM,
+import org.junit.Assert;
+import org.junit.Test;
 
-    ROUNDROBIN,
+public class HostWorkerTest {
 
-    LOWERWEIGHT;
+    @Test
+    public void testHostWorker1() {
+        HostWorker hostWorker = new HostWorker("192.158.2.2", 11, 20, 
"default");
+        Assert.assertEquals("192.158.2.2", hostWorker.getIp());
+        Assert.assertEquals(11, hostWorker.getPort());
+        Assert.assertEquals(20, hostWorker.getHostWeight());
+        Assert.assertEquals("default", hostWorker.getWorkerGroup());
+    }
 
-    public static HostSelector of(String selector){
-        for(HostSelector hs : values()){
-            if(hs.name().equalsIgnoreCase(selector)){
-                return hs;
-            }
-        }
-        throw new IllegalArgumentException("invalid host selector : " + 
selector);
+    @Test
+    public void testHostWorker2() {
+        HostWorker hostWorker = HostWorker.of("192.158.2.2:22", 80, "default");
+        Assert.assertEquals("192.158.2.2", hostWorker.getIp());
+        Assert.assertEquals(22, hostWorker.getPort());
+        Assert.assertEquals(80, hostWorker.getHostWeight());
+        Assert.assertEquals("default", hostWorker.getWorkerGroup());
     }
+
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
index fd5dda0..f822f04 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
@@ -17,24 +17,20 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
-
-import org.junit.Assert;
-import org.junit.Test;
-
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.junit.Assert;
+import org.junit.Test;
 
 public class LowerWeightRoundRobinTest {
 
-
     @Test
     public void testSelect() {
         Collection<HostWeight> sources = new ArrayList<>();
-        sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + 
(System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
-        sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + 
(System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24));
-        sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + 
(System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, 
"default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, 
"default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000));
 
         LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
         HostWeight result;
@@ -55,10 +51,10 @@ public class LowerWeightRoundRobinTest {
     @Test
     public void testWarmUpSelect() {
         Collection<HostWeight> sources = new ArrayList<>();
-        sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + 
(System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
-        sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + 
(System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84));
-        sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + 
(System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84));
-        sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + 
(System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000));
 
         LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
         HostWeight result;
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
index 14aa7b8..2173ec8 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
@@ -17,14 +17,11 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
+import java.util.Arrays;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.Collections;
-
 /**
  * random selector
  */
@@ -39,15 +36,14 @@ public class RandomSelectorTest {
     @Test
     public void testSelect1() {
         RandomSelector selector = new RandomSelector();
-        Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 
80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, 
System.currentTimeMillis())));
+        HostWorker result = selector.select(Arrays.asList(new 
HostWorker("192.168.1.1:11", 100, "default"), new HostWorker("192.168.1.2:22", 
80, "default")));
         Assert.assertNotNull(result);
     }
 
     @Test
     public void testSelect() {
         RandomSelector selector = new RandomSelector();
-        Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 
80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, 
System.currentTimeMillis())));
+        HostWorker result = selector.select(Arrays.asList(new 
HostWorker("192.168.1.1", 11, 100, "default"), new HostWorker("192.168.1.2:", 
22, 20, "default")));
         Assert.assertNotNull(result);
-
     }
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
index 1c9f492..08badd5 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
@@ -17,14 +17,12 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
+import java.util.Arrays;
+import java.util.List;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * round robin selector
  */
@@ -39,18 +37,16 @@ public class RoundRobinSelectorTest {
     @Test
     public void testSelect1() {
         RoundRobinSelector selector = new RoundRobinSelector();
-        // dismiss of server warm-up time
-        long startTime = System.currentTimeMillis() - 60 * 10 * 1000;
-        List<Host> hostOneList = Arrays.asList(
-            new Host("192.168.1.1", 80, 20, startTime, "kris"),
-            new Host("192.168.1.2", 80, 10, startTime, "kris"));
-
-        List<Host> hostTwoList = Arrays.asList(
-            new Host("192.168.1.1", 80, 20, startTime, "kris"),
-            new Host("192.168.1.2", 80, 10, startTime, "kris"),
-            new Host("192.168.1.3", 80, 10, startTime, "kris"));
-
-        Host result;
+        List<HostWorker> hostOneList = Arrays.asList(
+            new HostWorker("192.168.1.1", 80, 20, "kris"),
+            new HostWorker("192.168.1.2", 80, 10, "kris"));
+
+        List<HostWorker> hostTwoList = Arrays.asList(
+            new HostWorker("192.168.1.1", 80, 20, "kris"),
+            new HostWorker("192.168.1.2", 80, 10, "kris"),
+            new HostWorker("192.168.1.3", 80, 10, "kris"));
+
+        HostWorker result;
         result = selector.select(hostOneList);
         Assert.assertEquals("192.168.1.1", result.getIp());
 
@@ -93,17 +89,15 @@ public class RoundRobinSelectorTest {
 
         result = selector.select(hostOneList);
         Assert.assertEquals("192.168.1.1", result.getIp());
-
     }
 
     @Test
-    public void testWarmUpRoundRobinSelector() {
+    public void testWeightRoundRobinSelector() {
         RoundRobinSelector selector = new RoundRobinSelector();
-        Host result;
+        HostWorker result;
         result = selector.select(
-            Arrays.asList(new Host("192.168.1.1", 80, 20, 
System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 
80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris")));
+            Arrays.asList(new HostWorker("192.168.1.1", 11, 20, "kris"), new 
HostWorker("192.168.1.2", 22, 80, "kris")));
         Assert.assertEquals("192.168.1.2", result.getIp());
-
     }
 
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index 9b62473..8068ebd 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry;
 
 import static 
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
 
+import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -59,7 +60,7 @@ public class MasterRegistryTest {
         masterRegistry.registry();
         String masterPath = zookeeperRegistryCenter.getMasterPath();
         TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); 
//wait heartbeat info write into zk node
-        String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + 
":" + masterConfig.getListenPort());
+        String masterNodePath = masterPath + "/" + 
(NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
         String heartbeat = 
zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
         Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, 
heartbeat.split(",").length);
         masterRegistry.unRegistry();
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
index 0fd4a4f..0c6ac6d 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
@@ -69,10 +69,10 @@ public class RegisterOperator extends 
ZookeeperCachedOperator {
     }
 
     /**
-     * get host ip, string format: masterParentPath/ip
+     * get host ip:port, string format: parentPath/ip:port
      *
      * @param path path
-     * @return host ip, string format: masterParentPath/ip
+     * @return host ip:port, string format: parentPath/ip:port
      */
     protected String getHostByEventDataPath(String path) {
         if (StringUtils.isEmpty(path)) {
diff --git a/pom.xml b/pom.xml
index 4662fc4..2d18af9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -896,15 +896,15 @@
                         
<include>**/dao/datasource/MySQLDataSourceTest.java</include>
                         <include>**/dao/entity/TaskInstanceTest.java</include>
                         <include>**/dao/entity/UdfFuncTest.java</include>
-                        <include>**/remote/JsonSerializerTest.java</include>
-                        
<include>**/remote/RemoveTaskLogResponseCommandTest.java</include>
-                        <include>**/rpc/RpcTest.java</include>
-                        
<include>**/remote/RemoveTaskLogRequestCommandTest.java</include>
-                        
<include>**/remote/NettyRemotingClientTest.java</include>
-                        <include>**/remote/NettyUtilTest.java</include>
-                        <include>**/remote/ResponseFutureTest.java</include>
                         
<include>**/remote/command/alert/AlertSendRequestCommandTest.java</include>
                         
<include>**/remote/command/alert/AlertSendResponseCommandTest.java</include>
+                        
<include>**/remote/command/future/ResponseFutureTest.java</include>
+                        
<include>**/remote/command/log/RemoveTaskLogRequestCommandTest.java</include>
+                        
<include>**/remote/command/log/RemoveTaskLogResponseCommandTest.java</include>
+                        <include>**/remote/utils/HostTest.java</include>
+                        <include>**/remote/utils/NettyUtilTest.java</include>
+                        
<include>**/remote/NettyRemotingClientTest.java</include>
+                        <include>**/rpc/RpcTest.java</include>
                         <include>**/server/log/LoggerServerTest.java</include>
                         
<include>**/server/entity/SQLTaskExecutionContextTest.java</include>
                         
<include>**/server/log/MasterLogFilterTest.java</include>
@@ -919,6 +919,7 @@
                         
<include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
                         
<include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
                         
<include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
+                        
<include>**/server/master/dispatch/host/assign/HostWorkerTest.java</include>
                         
<include>**/server/master/register/MasterRegistryTest.java</include>
                         
<include>**/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java</include>
                         
<include>**/server/master/AlertManagerTest.java</include>
@@ -935,7 +936,6 @@
                         
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         
<include>**/server/utils/ExecutionContextTestUtils.java</include>
-                        <include>**/server/utils/HostTest.java</include>
                         
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
                         <include>**/server/utils/LogUtilsTest.java</include>
                         
<include>**/server/utils/MapReduceArgsUtilsTest.java</include>

Reply via email to