This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5856a12 [Improvement-4984][Worker] Refactor and Improve worker load
balance (#4996)
5856a12 is described below
commit 5856a12855328e67aeb6a2005f86b3c1081750a1
Author: Shiwen Cheng <[email protected]>
AuthorDate: Thu Mar 18 10:03:58 2021 +0800
[Improvement-4984][Worker] Refactor and Improve worker load balance (#4996)
* [Improvement][Remote] Improve unit tests
* [Improvement][Worker] Improve worker load balance
* [Improvement][Worker] Fix code smells
* [Improvement][Worker] Rename weight to hostWeight
---
.../api/service/impl/WorkerGroupServiceImpl.java | 13 +-
.../api/service/WorkerGroupServiceTest.java | 10 +-
.../apache/dolphinscheduler/common/Constants.java | 6 +-
.../dolphinscheduler/common/utils/ResInfo.java | 58 ++++++---
.../apache/dolphinscheduler/remote/utils/Host.java | 143 ++++-----------------
.../remote/NettyRemotingClientTest.java | 13 +-
.../{ => command/future}/ResponseFutureTest.java | 12 +-
.../log}/RemoveTaskLogRequestCommandTest.java | 10 +-
.../log}/RemoveTaskLogResponseCommandTest.java | 9 +-
.../dolphinscheduler/remote}/utils/HostTest.java | 23 +---
.../remote/{ => utils}/NettyUtilTest.java | 5 +-
.../master/dispatch/host/CommonHostManager.java | 48 ++++---
.../dispatch/host/LowerWeightHostManager.java | 60 ++++-----
.../master/dispatch/host/RandomHostManager.java | 11 +-
.../dispatch/host/RoundRobinHostManager.java | 7 +-
.../dispatch/host/assign/AbstractSelector.java | 4 +-
.../master/dispatch/host/assign/HostSelector.java | 6 +-
.../master/dispatch/host/assign/HostWeight.java | 44 +++----
.../master/dispatch/host/assign/HostWorker.java | 77 +++++++++++
.../dispatch/host/assign/RandomSelector.java | 14 +-
.../dispatch/host/assign/RoundRobinSelector.java | 27 ++--
.../server/master/registry/MasterRegistry.java | 4 +-
.../server/registry/HeartBeatTask.java | 30 ++++-
.../server/worker/config/WorkerConfig.java | 6 +-
.../server/worker/registry/WorkerRegistry.java | 24 ++--
.../dolphinscheduler/server/zk/ZKMasterClient.java | 2 -
.../dispatch/host/assign/HostWorkerTest.java} | 34 ++---
.../host/assign/LowerWeightRoundRobinTest.java | 22 ++--
.../dispatch/host/assign/RandomSelectorTest.java | 10 +-
.../host/assign/RoundRobinSelectorTest.java | 36 +++---
.../server/master/registry/MasterRegistryTest.java | 3 +-
.../service/zk/RegisterOperator.java | 4 +-
pom.xml | 16 +--
33 files changed, 388 insertions(+), 403 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 7ce6a74..e0282de 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -163,18 +163,13 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
if (CollectionUtils.isEmpty(childrenNodes)) {
continue;
}
- String timeStamp = childrenNodes.get(0);
- for (int i = 0; i < childrenNodes.size(); i++) {
- childrenNodes.set(i,
Host.of(childrenNodes.get(i)).getAddressAndWeight());
- }
-
WorkerGroup wg = new WorkerGroup();
wg.setName(workerGroup);
if (isPaging) {
- wg.setIpList(childrenNodes);
- String registeredIpValue =
zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp);
-
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
-
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
+ wg.setIpList(childrenNodes.stream().map(node ->
Host.of(node).getIp()).collect(Collectors.toList()));
+ String registeredValue =
zookeeperCachedOperator.get(workerGroupPath + SLASH + childrenNodes.get(0));
+
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(",")[6]));
+
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(",")[7]));
}
workerGroups.add(wg);
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index bc95ad1..ce9f239 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -70,13 +70,13 @@ public class WorkerGroupServiceTest {
workerGroupStrList.add("test");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
- List<String> defaultIpList = new ArrayList<>();
- defaultIpList.add("192.168.220.188:1234:100:1234567");
- defaultIpList.add("192.168.220.189:1234:100:1234567");
+ List<String> defaultAddressList = new ArrayList<>();
+ defaultAddressList.add("192.168.220.188:1234");
+ defaultAddressList.add("192.168.220.189:1234");
- Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath +
"/default")).thenReturn(defaultIpList);
+ Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath +
"/default")).thenReturn(defaultAddressList);
- Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/"
+ defaultIpList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21
11:17:59,2020-07-21 14:39:20,0,13238");
+ Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/"
+
defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21
11:17:59,2020-07-21 14:39:20,0,13238");
}
/**
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 1741257..d6d571f 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -385,6 +385,10 @@ public final class Constants {
*/
public static final double DEFAULT_WORKER_RESERVED_MEMORY =
OSUtils.totalMemorySize() / 10;
+ /**
+ * worker host weight
+ */
+ public static final int DEFAULT_WORKER_HOST_WEIGHT = 100;
/**
* default log cache rows num,output when reach the number
@@ -542,7 +546,7 @@ public final class Constants {
* heartbeat for zk info length
*/
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
-
+ public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH =
11;
/**
* jar
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
index 8a6ef1e..8f533a0 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.utils;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.model.Server;
@@ -38,10 +40,8 @@ public class ResInfo {
*/
private double loadAverage;
- public ResInfo(){}
-
- public ResInfo(double cpuUsage , double memoryUsage){
- this.cpuUsage = cpuUsage ;
+ public ResInfo(double cpuUsage, double memoryUsage) {
+ this.cpuUsage = cpuUsage;
this.memoryUsage = memoryUsage;
}
@@ -81,35 +81,53 @@ public class ResInfo {
* @param loadAverage load average
* @return cpu and memory usage
*/
- public static String getResInfoJson(double cpuUsage , double
memoryUsage,double loadAverage){
+ public static String getResInfoJson(double cpuUsage, double memoryUsage,
double loadAverage) {
ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage);
return JSONUtils.toJsonString(resInfo);
}
-
/**
* parse heartbeat info for zk
* @param heartBeatInfo heartbeat info
* @return heartbeat info to Server
*/
- public static Server parseHeartbeatForZKInfo(String heartBeatInfo){
- if (StringUtils.isEmpty(heartBeatInfo)) {
+ public static Server parseHeartbeatForZKInfo(String heartBeatInfo) {
+ if (!isValidHeartbeatForZKInfo(heartBeatInfo)) {
return null;
}
- String[] masterArray = heartBeatInfo.split(Constants.COMMA);
- if(masterArray.length !=
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
- return null;
+ String[] parts = heartBeatInfo.split(Constants.COMMA);
+ Server server = new Server();
+ server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]),
+ Double.parseDouble(parts[1]),
+ Double.parseDouble(parts[2])));
+ server.setCreateTime(DateUtils.stringToDate(parts[6]));
+ server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7]));
+ //set process id
+ server.setId(Integer.parseInt(parts[9]));
+ return server;
+ }
+ /**
+ * is valid heartbeat info for zk
+ * @param heartBeatInfo heartbeat info
+ * @return heartbeat info is valid
+ */
+ public static boolean isValidHeartbeatForZKInfo(String heartBeatInfo) {
+ if (StringUtils.isNotEmpty(heartBeatInfo)) {
+ String[] parts = heartBeatInfo.split(Constants.COMMA);
+ return parts.length ==
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH
+ || parts.length ==
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
}
- Server masterServer = new Server();
-
masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]),
- Double.parseDouble(masterArray[1]),
- Double.parseDouble(masterArray[2])));
- masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6]));
-
masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7]));
- //set process id
- masterServer.setId(Integer.parseInt(masterArray[9]));
- return masterServer;
+ return false;
+ }
+
+ /**
+ * is new heartbeat info for zk with weight
+ * @param parts heartbeat info parts
+ * @return heartbeat info is new with weight
+ */
+ public static boolean isNewHeartbeatWithWeight(String[] parts) {
+ return parts.length ==
Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
}
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 7e42984..359baef 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.remote.utils;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import java.io.Serializable;
-import java.util.Objects;
-import java.util.StringJoiner;
/**
* server address
@@ -43,21 +41,6 @@ public class Host implements Serializable {
*/
private int port;
- /**
- * weight
- */
- private int weight;
-
- /**
- * startTime
- */
- private long startTime;
-
- /**
- * workGroup
- */
- private String workGroup;
-
public Host() {
}
@@ -67,21 +50,11 @@ public class Host implements Serializable {
this.address = ip + COLON + port;
}
- public Host(String ip, int port, int weight, long startTime) {
- this.ip = ip;
- this.port = port;
- this.address = ip + COLON + port;
- this.weight = getWarmUpWeight(weight, startTime);
- this.startTime = startTime;
- }
-
- public Host(String ip, int port, int weight, long startTime, String
workGroup) {
- this.ip = ip;
- this.port = port;
- this.address = ip + COLON + port;
- this.weight = getWarmUpWeight(weight, startTime);
- this.workGroup = workGroup;
- this.startTime = startTime;
+ public Host(String address) {
+ String[] parts = splitAddress(address);
+ this.ip = parts[0];
+ this.port = Integer.parseInt(parts[1]);
+ this.address = address;
}
public String getAddress() {
@@ -89,6 +62,9 @@ public class Host implements Serializable {
}
public void setAddress(String address) {
+ String[] parts = splitAddress(address);
+ this.ip = parts[0];
+ this.port = Integer.parseInt(parts[1]);
this.address = address;
}
@@ -101,22 +77,6 @@ public class Host implements Serializable {
this.address = ip + COLON + port;
}
- public int getWeight() {
- return weight;
- }
-
- public void setWeight(int weight) {
- this.weight = weight;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
public int getPort() {
return port;
}
@@ -126,12 +86,15 @@ public class Host implements Serializable {
this.address = ip + COLON + port;
}
- public String getWorkGroup() {
- return workGroup;
- }
-
- public void setWorkGroup(String workGroup) {
- this.workGroup = workGroup;
+ /**
+ * address convert host
+ *
+ * @param address address
+ * @return host
+ */
+ public static Host of(String address) {
+ String[] parts = splitAddress(address);
+ return new Host(parts[0], Integer.parseInt(parts[1]));
}
/**
@@ -140,37 +103,15 @@ public class Host implements Serializable {
* @param address address
* @return host
*/
- public static Host of(String address) {
+ public static String[] splitAddress(String address) {
if (address == null) {
throw new IllegalArgumentException("Host : address is null.");
}
String[] parts = address.split(COLON);
- if (parts.length < 2) {
+ if (parts.length != 2) {
throw new IllegalArgumentException(String.format("Host : %s
illegal.", address));
}
- Host host = null;
- if (parts.length == 2) {
- host = new Host(parts[0], Integer.parseInt(parts[1]));
- }
- if (parts.length == 4) {
- host = new Host(parts[0], Integer.parseInt(parts[1]),
Integer.parseInt(parts[2]), Long.parseLong(parts[3]));
- }
- return host;
- }
-
- /**
- * generate host string
- * @param address address
- * @param weight weight
- * @param startTime startTime
- * @return address:weight:startTime
- */
- public static String generate(String address, int weight, long startTime) {
- StringJoiner stringJoiner = new StringJoiner(COLON);
- stringJoiner.add(address)
- .add(String.valueOf(weight))
- .add(String.valueOf(startTime));
- return stringJoiner.toString();
+ return parts;
}
/**
@@ -181,54 +122,16 @@ public class Host implements Serializable {
*/
public static Boolean isOldVersion(String address) {
String[] parts = address.split(COLON);
- return parts.length != 2 && parts.length != 3;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Host host = (Host) o;
- return Objects.equals(getAddress(), host.getAddress());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getAddress());
+ return parts.length != 2;
}
@Override
public String toString() {
return "Host{"
+ "address='" + address + '\''
- + ", weight=" + weight
- + ", startTime=" + startTime
- + ", workGroup='" + workGroup + '\''
+ + ", ip='" + ip + '\''
+ + ", port=" + port
+ '}';
}
- /**
- * warm up
- */
- private int getWarmUpWeight(int weight, long startTime) {
- long uptime = System.currentTimeMillis() - startTime;
- //If the warm-up is not over, reduce the weight
- if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
- return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME));
- }
- return weight;
- }
-
- /**
- * get address and weight
- *
- * @return address:weight
- */
- public String getAddressAndWeight() {
- return address + COLON + weight;
- }
}
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
index cfc10b2..a3f6c7b 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.remote;
-import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.Ping;
@@ -28,23 +27,25 @@ import
org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.junit.Assert;
-import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.netty.channel.Channel;
+
/**
* netty remote client test
*/
public class NettyRemotingClientTest {
-
/**
- * test sned sync
+ * test send sync
*/
@Test
- public void testSendSync(){
+ public void testSendSync() {
NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java
similarity index 93%
rename from
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java
rename to
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java
index 8836043..0190c99 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java
@@ -15,24 +15,24 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.remote;
-
+package org.apache.dolphinscheduler.remote.command.future;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.junit.Assert;
-import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+
public class ResponseFutureTest {
@Test
- public void testScanFutureTable(){
+ public void testScanFutureTable() {
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("executor-service"));
executorService.scheduleAtFixedRate(new Runnable() {
@Override
@@ -51,7 +51,7 @@ public class ResponseFutureTest {
ResponseFuture future = new ResponseFuture(1, 2000, invokeCallback,
null);
try {
latch.await(5000, TimeUnit.MILLISECONDS);
- Assert.assertTrue(ResponseFuture.getFuture(1) == null);
+ Assert.assertNull(ResponseFuture.getFuture(1));
} catch (InterruptedException e) {
e.printStackTrace();
}
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java
similarity index 83%
rename from
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java
rename to
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java
index 37c2106..10646f9 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.remote;
+package org.apache.dolphinscheduler.remote.command.log;
-import junit.framework.Assert;
import org.apache.dolphinscheduler.remote.command.Command;
-import
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
-import
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
+
import org.junit.Test;
+import junit.framework.Assert;
+
public class RemoveTaskLogRequestCommandTest {
@Test
- public void testConvert2Command(){
+ public void testConvert2Command() {
RemoveTaskLogResponseCommand removeTaskLogResponseCommand = new
RemoveTaskLogResponseCommand();
removeTaskLogResponseCommand.setStatus(true);
Command command = removeTaskLogResponseCommand.convert2Command(122);
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java
similarity index 88%
rename from
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java
rename to
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java
index aab0ad3..87dd700 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java
@@ -15,17 +15,18 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.remote;
+package org.apache.dolphinscheduler.remote.command.log;
-import junit.framework.Assert;
import org.apache.dolphinscheduler.remote.command.Command;
-import
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
+
import org.junit.Test;
+import junit.framework.Assert;
+
public class RemoveTaskLogResponseCommandTest {
@Test
- public void testConvert2Command(){
+ public void testConvert2Command() {
RemoveTaskLogRequestCommand removeTaskLogRequestCommand = new
RemoveTaskLogRequestCommand();
removeTaskLogRequestCommand.setPath("/opt/zhangsan");
Command command = removeTaskLogRequestCommand.convert2Command();
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java
similarity index 56%
rename from
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
rename to
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java
index 80ff11e..a9da007 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.utils;
-
-import org.apache.dolphinscheduler.remote.utils.Host;
+package org.apache.dolphinscheduler.remote.utils;
import org.junit.Assert;
import org.junit.Test;
@@ -28,25 +26,12 @@ import org.junit.Test;
public class HostTest {
@Test
- public void testHostWarmUp() {
- Host host = Host.of(("192.158.2.2:22:100:" +
(System.currentTimeMillis() - 60 * 5 * 1000)));
- Assert.assertEquals(50, host.getWeight());
- host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() -
60 * 10 * 1000)));
- Assert.assertEquals(100, host.getWeight());
- }
-
- @Test
public void testHost() {
Host host = Host.of("192.158.2.2:22");
Assert.assertEquals(22, host.getPort());
+ host.setAddress("127.0.0.1:8888");
+ Assert.assertEquals("127.0.0.1", host.getIp());
+ Assert.assertEquals(8888, host.getPort());
}
- @Test
- public void testGenerate() {
- String address = "192.158.2.2:22";
- int weight = 100;
- long startTime = System.currentTimeMillis();
- String generateHost = Host.generate(address, weight, startTime);
- Assert.assertEquals(address + ":" + weight + ":" + startTime,
generateHost);
- }
}
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java
similarity index 92%
rename from
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java
rename to
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java
index e95dbdd..a3e1385 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java
@@ -15,12 +15,10 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.remote;
+package org.apache.dolphinscheduler.remote.utils;
import static org.apache.dolphinscheduler.remote.utils.Constants.OS_NAME;
-import org.apache.dolphinscheduler.remote.utils.NettyUtils;
-
import org.junit.Assert;
import org.junit.Test;
@@ -31,7 +29,6 @@ import io.netty.channel.epoll.Epoll;
*/
public class NettyUtilTest {
-
@Test
public void testUserEpoll() {
if (OS_NAME.toLowerCase().contains("linux") && Epoll.isAvailable()) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 4a3d4bd..7184b9e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -17,26 +17,32 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.springframework.beans.factory.annotation.Autowired;
/**
* round robin host manager
*/
public abstract class CommonHostManager implements HostManager {
- private final Logger logger =
LoggerFactory.getLogger(CommonHostManager.class);
+ /**
+ * zookeeper registry center
+ */
+ @Autowired
+ protected ZookeeperRegistryCenter registryCenter;
/**
* zookeeperNodeManager
@@ -50,16 +56,15 @@ public abstract class CommonHostManager implements
HostManager {
* @return host
*/
@Override
- public Host select(ExecutionContext context){
+ public Host select(ExecutionContext context) {
Host host = new Host();
Collection<String> nodes = null;
- /**
- * executor type
- */
+ String workerGroup = context.getWorkerGroup();
+ // executor type
ExecutorType executorType = context.getExecutorType();
- switch (executorType){
+ switch (executorType) {
case WORKER:
- nodes =
zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
+ nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup);
break;
case CLIENT:
break;
@@ -67,21 +72,26 @@ public abstract class CommonHostManager implements
HostManager {
throw new IllegalArgumentException("invalid executorType : " +
executorType);
}
- if(CollectionUtils.isEmpty(nodes)){
+ if (nodes == null || nodes.isEmpty()) {
return host;
}
- List<Host> candidateHosts = new ArrayList<>(nodes.size());
+ List<HostWorker> candidateHosts = new ArrayList<>();
nodes.forEach(node -> {
- Host nodeHost=Host.of(node);
- nodeHost.setWorkGroup(context.getWorkerGroup());
- candidateHosts.add(nodeHost);
+ String workerGroupPath =
registryCenter.getWorkerGroupPath(workerGroup);
+ String heartbeat =
registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
+ int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
+ if (StringUtils.isNotEmpty(heartbeat)) {
+ String[] parts = heartbeat.split(Constants.COMMA);
+ if (ResInfo.isNewHeartbeatWithWeight(parts)) {
+ hostWeight = Integer.parseInt(parts[10]);
+ }
+ }
+ candidateHosts.add(HostWorker.of(node, hostWeight, workerGroup));
});
-
-
return select(candidateHosts);
}
- protected abstract Host select(Collection<Host> nodes);
+ protected abstract HostWorker select(Collection<HostWorker> nodes);
public void setZookeeperNodeManager(ZookeeperNodeManager
zookeeperNodeManager) {
this.zookeeperNodeManager = zookeeperNodeManager;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index ac7d8b0..ef249a0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -19,20 +19,20 @@ package
org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
+import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -40,8 +40,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* round robin host manager
@@ -51,12 +54,6 @@ public class LowerWeightHostManager extends
CommonHostManager {
private final Logger logger =
LoggerFactory.getLogger(LowerWeightHostManager.class);
/**
- * zookeeper registry center
- */
- @Autowired
- private ZookeeperRegistryCenter registryCenter;
-
- /**
* round robin host manager
*/
private RoundRobinHostManager roundRobinHostManager;
@@ -82,7 +79,7 @@ public class LowerWeightHostManager extends CommonHostManager
{
private ScheduledExecutorService executorService;
@PostConstruct
- public void init(){
+ public void init() {
this.selector = new LowerWeightRoundRobin();
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
@@ -103,20 +100,20 @@ public class LowerWeightHostManager extends
CommonHostManager {
* @return host
*/
@Override
- public Host select(ExecutionContext context){
+ public Host select(ExecutionContext context) {
Set<HostWeight> workerHostWeights =
getWorkerHostWeights(context.getWorkerGroup());
- if(CollectionUtils.isNotEmpty(workerHostWeights)){
+ if (CollectionUtils.isNotEmpty(workerHostWeights)) {
return selector.select(workerHostWeights).getHost();
}
return new Host();
}
@Override
- public Host select(Collection<Host> nodes) {
+ public HostWorker select(Collection<HostWorker> nodes) {
throw new UnsupportedOperationException("not support");
}
- private void syncWorkerHostWeight(Map<String, Set<HostWeight>>
workerHostWeights){
+ private void syncWorkerHostWeight(Map<String, Set<HostWeight>>
workerHostWeights) {
lock.lock();
try {
workerHostWeightsMap.clear();
@@ -126,7 +123,7 @@ public class LowerWeightHostManager extends
CommonHostManager {
}
}
- private Set<HostWeight> getWorkerHostWeights(String workerGroup){
+ private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
lock.lock();
try {
return workerHostWeightsMap.get(workerGroup);
@@ -135,7 +132,7 @@ public class LowerWeightHostManager extends
CommonHostManager {
}
}
- class RefreshResourceTask implements Runnable{
+ class RefreshResourceTask implements Runnable {
@Override
public void run() {
@@ -143,35 +140,34 @@ public class LowerWeightHostManager extends
CommonHostManager {
Map<String, Set<String>> workerGroupNodes =
zookeeperNodeManager.getWorkerGroupNodes();
Set<Map.Entry<String, Set<String>>> entries =
workerGroupNodes.entrySet();
Map<String, Set<HostWeight>> workerHostWeights = new
HashMap<>();
- for(Map.Entry<String, Set<String>> entry : entries){
+ for (Map.Entry<String, Set<String>> entry : entries) {
String workerGroup = entry.getKey();
Set<String> nodes = entry.getValue();
String workerGroupPath =
registryCenter.getWorkerGroupPath(workerGroup);
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
- for(String node : nodes){
+ for (String node : nodes) {
String heartbeat =
registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
- if(StringUtils.isNotEmpty(heartbeat)
- && heartbeat.split(COMMA).length ==
Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
- String[] parts = heartbeat.split(COMMA);
-
+ if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) {
+ String[] parts = heartbeat.split(Constants.COMMA);
int status = Integer.parseInt(parts[8]);
- if (status == Constants.ABNORMAL_NODE_STATUS){
+ if (status == Constants.ABNORMAL_NODE_STATUS) {
logger.warn("load is too high or
availablePhysicalMemorySize(G) is too low, it's
availablePhysicalMemorySize(G):{},loadAvg:{}",
Double.parseDouble(parts[3]) ,
Double.parseDouble(parts[2]));
continue;
}
-
double cpu = Double.parseDouble(parts[0]);
double memory = Double.parseDouble(parts[1]);
double loadAverage = Double.parseDouble(parts[2]);
- HostWeight hostWeight = new
HostWeight(Host.of(node), cpu, memory, loadAverage);
+ long startTime =
DateUtils.stringToDate(parts[6]).getTime();
+ int weight =
ResInfo.isNewHeartbeatWithWeight(parts) ? Integer.parseInt(parts[10]) :
Constants.DEFAULT_WORKER_HOST_WEIGHT;
+ HostWeight hostWeight = new
HostWeight(HostWorker.of(node, weight, workerGroup), cpu, memory, loadAverage,
startTime);
hostWeights.add(hostWeight);
}
}
workerHostWeights.put(workerGroup, hostWeights);
}
syncWorkerHostWeight(workerHostWeights);
- } catch (Throwable ex){
+ } catch (Throwable ex) {
logger.error("RefreshResourceTask error", ex);
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
index 241906a..ad4f4aa 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
@@ -17,13 +17,11 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
-import org.apache.dolphinscheduler.remote.utils.Host;
+import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import java.util.Collection;
-
/**
* round robin host manager
*/
@@ -32,17 +30,18 @@ public class RandomHostManager extends CommonHostManager {
/**
* selector
*/
- private final Selector<Host> selector;
+ private final RandomSelector selector;
/**
* set round robin
*/
- public RandomHostManager(){
+ public RandomHostManager() {
this.selector = new RandomSelector();
}
@Override
- public Host select(Collection<Host> nodes) {
+ public HostWorker select(Collection<HostWorker> nodes) {
return selector.select(nodes);
}
+
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index ec1945e..c6b1734 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -17,9 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
-import org.apache.dolphinscheduler.remote.utils.Host;
+import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
-import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import java.util.Collection;
@@ -32,7 +31,7 @@ public class RoundRobinHostManager extends CommonHostManager {
/**
* selector
*/
- private final Selector<Host> selector;
+ private final RoundRobinSelector selector;
/**
* set round robin
@@ -42,7 +41,7 @@ public class RoundRobinHostManager extends CommonHostManager {
}
@Override
- public Host select(Collection<Host> nodes) {
+ public HostWorker select(Collection<HostWorker> nodes) {
return selector.select(nodes);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
index 8560da9..087a5ff 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
@@ -23,7 +23,7 @@ import java.util.Collection;
/**
* AbstractSelector
*/
-public abstract class AbstractSelector<T> implements Selector<T>{
+public abstract class AbstractSelector<T> implements Selector<T> {
@Override
public T select(Collection<T> source) {
@@ -40,6 +40,6 @@ public abstract class AbstractSelector<T> implements
Selector<T>{
return doSelect(source);
}
- protected abstract T doSelect(Collection<T> source);
+ protected abstract T doSelect(Collection<T> source);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
index 145393e..0be87e2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
@@ -28,9 +28,9 @@ public enum HostSelector {
LOWERWEIGHT;
- public static HostSelector of(String selector){
- for(HostSelector hs : values()){
- if(hs.name().equalsIgnoreCase(selector)){
+ public static HostSelector of(String selector) {
+ for (HostSelector hs : values()) {
+ if (hs.name().equalsIgnoreCase(selector)) {
return hs;
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
index 839ebc8..9d7855f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -31,57 +31,55 @@ public class HostWeight {
private final int LOAD_AVERAGE_FACTOR = 70;
- private final Host host;
+ private final HostWorker hostWorker;
private final double weight;
private double currentWeight;
- public HostWeight(Host host, double cpu, double memory, double
loadAverage) {
- this.weight = getWeight(cpu, memory, loadAverage, host);
- this.host = host;
- this.currentWeight = weight;
- }
-
- public double getCurrentWeight() {
- return currentWeight;
+ public HostWeight(HostWorker hostWorker, double cpu, double memory, double
loadAverage, long startTime) {
+ this.hostWorker = hostWorker;
+ this.weight = calculateWeight(cpu, memory, loadAverage, startTime);
+ this.currentWeight = this.weight;
}
public double getWeight() {
return weight;
}
+ public double getCurrentWeight() {
+ return currentWeight;
+ }
+
public void setCurrentWeight(double currentWeight) {
this.currentWeight = currentWeight;
}
+ public HostWorker getHostWorker() {
+ return hostWorker;
+ }
+
public Host getHost() {
- return host;
+ return (Host)hostWorker;
}
@Override
public String toString() {
return "HostWeight{"
- + "host=" + host
+ + "hostWorker=" + hostWorker
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ '}';
}
- private double getWeight(double cpu, double memory, double loadAverage,
Host host) {
- double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR +
loadAverage * LOAD_AVERAGE_FACTOR;
- return getWarmUpWeight(host, calculateWeight);
- }
-
- /**
- * If the warm-up is not over, add the weight
- */
- private double getWarmUpWeight(Host host, double weight) {
- long startTime = host.getStartTime();
+ private double calculateWeight(double cpu, double memory, double
loadAverage, long startTime) {
+ double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR +
loadAverage * LOAD_AVERAGE_FACTOR;
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
- return weight * Constants.WARM_UP_TIME / uptime;
+ // If the warm-up is not over, add the weight
+ return calculatedWeight * Constants.WARM_UP_TIME / uptime;
}
- return weight;
+ return calculatedWeight;
}
+
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java
new file mode 100644
index 0000000..6515464
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+/**
+ * host worker
+ */
+public class HostWorker extends Host {
+
+ /**
+ * host weight
+ */
+ private int hostWeight;
+
+ /**
+ * worker group
+ */
+ private String workerGroup;
+
+ public HostWorker(String ip, int port, int hostWeight, String workerGroup)
{
+ super(ip, port);
+ this.hostWeight = hostWeight;
+ this.workerGroup = workerGroup;
+ }
+
+ public HostWorker(String address, int hostWeight, String workerGroup) {
+ super(address);
+ this.hostWeight = hostWeight;
+ this.workerGroup = workerGroup;
+ }
+
+ public int getHostWeight() {
+ return hostWeight;
+ }
+
+ public void setHostWeight(int hostWeight) {
+ this.hostWeight = hostWeight;
+ }
+
+ public String getWorkerGroup() {
+ return workerGroup;
+ }
+
+ public void setWorkerGroup(String workerGroup) {
+ this.workerGroup = workerGroup;
+ }
+
+ public static HostWorker of(String address, int hostWeight, String
workerGroup) {
+ return new HostWorker(address, hostWeight, workerGroup);
+ }
+
+ @Override
+ public String toString() {
+ return "Host{"
+ + "hostWeight=" + hostWeight
+ + ", workerGroup='" + workerGroup + '\''
+ + '}';
+ }
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index 6975127..2b7488a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.dolphinscheduler.remote.utils.Host;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -27,20 +25,20 @@ import java.util.concurrent.ThreadLocalRandom;
/**
* random selector
*/
-public class RandomSelector extends AbstractSelector<Host> {
+public class RandomSelector extends AbstractSelector<HostWorker> {
@Override
- public Host doSelect(final Collection<Host> source) {
+ public HostWorker doSelect(final Collection<HostWorker> source) {
- List<Host> hosts = new ArrayList<>(source);
+ List<HostWorker> hosts = new ArrayList<>(source);
int size = hosts.size();
int[] weights = new int[size];
int totalWeight = 0;
int index = 0;
- for (Host host : hosts) {
- totalWeight += host.getWeight();
- weights[index] = host.getWeight();
+ for (HostWorker host : hosts) {
+ totalWeight += host.getHostWeight();
+ weights[index] = host.getHostWeight();
index++;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index 34a79ac..a5d93a6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -16,20 +16,21 @@
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.springframework.stereotype.Service;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.springframework.stereotype.Service;
+
/**
* Smooth Weight Round Robin
*/
@Service
-public class RoundRobinSelector extends AbstractSelector<Host> {
+public class RoundRobinSelector extends AbstractSelector<HostWorker> {
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>>
workGroupWeightMap = new ConcurrentHashMap<>();
@@ -69,12 +70,11 @@ public class RoundRobinSelector extends
AbstractSelector<Host> {
}
-
@Override
- public Host doSelect(Collection<Host> source) {
+ public HostWorker doSelect(Collection<HostWorker> source) {
- List<Host> hosts = new ArrayList<>(source);
- String key = hosts.get(0).getWorkGroup();
+ List<HostWorker> hosts = new ArrayList<>(source);
+ String key = hosts.get(0).getWorkerGroup();
ConcurrentMap<String, WeightedRoundRobin> map =
workGroupWeightMap.get(key);
if (map == null) {
workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
@@ -84,13 +84,13 @@ public class RoundRobinSelector extends
AbstractSelector<Host> {
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
- Host selectedHost = null;
+ HostWorker selectedHost = null;
WeightedRoundRobin selectWeightRoundRobin = null;
- for (Host host : hosts) {
- String workGroupHost = host.getWorkGroup() + host.getAddress();
+ for (HostWorker host : hosts) {
+ String workGroupHost = host.getWorkerGroup() + host.getAddress();
WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
- int weight = host.getWeight();
+ int weight = host.getHostWeight();
if (weight < 0) {
weight = 0;
}
@@ -117,7 +117,6 @@ public class RoundRobinSelector extends
AbstractSelector<Host> {
totalWeight += weight;
}
-
if (!updateLock.get() && hosts.size() != map.size() &&
updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new
ConcurrentHashMap<>(map);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index e54fc84..f77f00d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -98,8 +98,8 @@ public class MasterRegistry {
});
int masterHeartbeatInterval =
masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
- masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(),
+ masterConfig.getMasterReservedMemory(),
Sets.newHashSet(getMasterPath()),
Constants.MASTER_PREFIX,
zookeeperRegistryCenter);
@@ -132,9 +132,7 @@ public class MasterRegistry {
* get local address
*/
private String getLocalAddress() {
-
return NetUtils.getAddr(masterConfig.getListenPort());
-
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index d5f7a6e..e4c3d6a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -38,8 +38,9 @@ public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private String startTime;
- private double reservedMemory;
private double maxCpuloadAvg;
+ private double reservedMemory;
+ private int hostWeight; // worker host weight
private Set<String> heartBeatPaths;
private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@@ -48,23 +49,38 @@ public class HeartBeatTask implements Runnable {
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
- double reservedMemory,
double maxCpuloadAvg,
+ double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime;
- this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg;
+ this.reservedMemory = reservedMemory;
this.heartBeatPaths = heartBeatPaths;
+ this.serverType = serverType;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+ }
+
+ public HeartBeatTask(String startTime,
+ double maxCpuloadAvg,
+ double reservedMemory,
+ int hostWeight,
+ Set<String> heartBeatPaths,
+ String serverType,
+ ZookeeperRegistryCenter zookeeperRegistryCenter) {
+ this.startTime = startTime;
+ this.maxCpuloadAvg = maxCpuloadAvg;
+ this.reservedMemory = reservedMemory;
+ this.hostWeight = hostWeight;
+ this.heartBeatPaths = heartBeatPaths;
this.serverType = serverType;
+ this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void run() {
try {
-
// check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) {
if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath,
serverType)) {
@@ -94,8 +110,12 @@ public class HeartBeatTask implements Runnable {
builder.append(startTime).append(Constants.COMMA);
builder.append(DateUtils.dateToString(new
Date())).append(Constants.COMMA);
builder.append(status).append(COMMA);
- //save process id
+ // save process id
builder.append(OSUtils.getProcessID());
+ // worker host weight
+ if (Constants.WORKER_PREFIX.equals(serverType)) {
+ builder.append(Constants.COMMA).append(hostWeight);
+ }
for (String heartBeatPath : heartBeatPaths) {
zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath,
builder.toString());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 228f6ab..0bd84f5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -47,7 +47,7 @@ public class WorkerConfig {
@Value("#{'${worker.groups:default}'.split(',')}")
private Set<String> workerGroups;
- @Value("${worker.listen.port: 1234}")
+ @Value("${worker.listen.port:1234}")
private int listenPort;
@Value("${worker.host.weight:100}")
@@ -119,8 +119,8 @@ public class WorkerConfig {
return hostWeight;
}
- public void setHostWeight(int weight) {
- this.hostWeight = weight;
+ public void setHostWeight(int hostWeight) {
+ this.hostWeight = hostWeight;
}
public String getAlertListenHost() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 06b72a5..994fb58 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -34,6 +33,7 @@ import org.apache.curator.framework.state.ConnectionState;
import java.util.Date;
import java.util.Set;
+import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -120,12 +120,13 @@ public class WorkerRegistry {
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
}
- HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
- this.workerConfig.getWorkerReservedMemory(),
- this.workerConfig.getWorkerMaxCpuloadAvg(),
+ HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+ workerConfig.getWorkerMaxCpuloadAvg(),
+ workerConfig.getWorkerReservedMemory(),
+ workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_PREFIX,
- this.zookeeperRegistryCenter);
+ zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address,
workerHeartbeatInterval);
@@ -150,22 +151,19 @@ public class WorkerRegistry {
*/
public Set<String> getWorkerZkPaths() {
Set<String> workerZkPaths = Sets.newHashSet();
-
String address = getLocalAddress();
String workerZkPathPrefix =
this.zookeeperRegistryCenter.getWorkerPath();
- int weight = workerConfig.getHostWeight();
- long workerStartTime = System.currentTimeMillis();
for (String workGroup : this.workerGroups) {
- StringBuilder workerZkPathBuilder = new StringBuilder(100);
- workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH);
+ StringJoiner workerZkPathJoiner = new StringJoiner(SLASH);
+ workerZkPathJoiner.add(workerZkPathPrefix);
if (StringUtils.isEmpty(workGroup)) {
workGroup = DEFAULT_WORKER_GROUP;
}
// trim and lower case is need
-
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
- workerZkPathBuilder.append(Host.generate(address, weight,
workerStartTime));
- workerZkPaths.add(workerZkPathBuilder.toString());
+ workerZkPathJoiner.add(workGroup.trim().toLowerCase());
+ workerZkPathJoiner.add(address);
+ workerZkPaths.add(workerZkPathJoiner.toString());
}
return workerZkPaths;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index e813cdf..0f30a5c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.MasterServer;
@@ -308,7 +307,6 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception
*/
private void failoverWorker(String workerHost, boolean
needCheckWorkerAlive) throws Exception {
- workerHost = Host.of(workerHost).getAddress();
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java
similarity index 51%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
copy to
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java
index 145393e..11f007b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java
@@ -17,23 +17,27 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-/**
- * host selector
- */
-public enum HostSelector {
-
- RANDOM,
+import org.junit.Assert;
+import org.junit.Test;
- ROUNDROBIN,
+public class HostWorkerTest {
- LOWERWEIGHT;
+ @Test
+ public void testHostWorker1() {
+ HostWorker hostWorker = new HostWorker("192.158.2.2", 11, 20,
"default");
+ Assert.assertEquals("192.158.2.2", hostWorker.getIp());
+ Assert.assertEquals(11, hostWorker.getPort());
+ Assert.assertEquals(20, hostWorker.getHostWeight());
+ Assert.assertEquals("default", hostWorker.getWorkerGroup());
+ }
- public static HostSelector of(String selector){
- for(HostSelector hs : values()){
- if(hs.name().equalsIgnoreCase(selector)){
- return hs;
- }
- }
- throw new IllegalArgumentException("invalid host selector : " +
selector);
+ @Test
+ public void testHostWorker2() {
+ HostWorker hostWorker = HostWorker.of("192.158.2.2:22", 80, "default");
+ Assert.assertEquals("192.158.2.2", hostWorker.getIp());
+ Assert.assertEquals(22, hostWorker.getPort());
+ Assert.assertEquals(80, hostWorker.getHostWeight());
+ Assert.assertEquals("default", hostWorker.getWorkerGroup());
}
+
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
index fd5dda0..f822f04 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
@@ -17,24 +17,20 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.dolphinscheduler.remote.utils.Host;
-
-import org.junit.Assert;
-import org.junit.Test;
-
import java.util.ArrayList;
import java.util.Collection;
+import org.junit.Assert;
+import org.junit.Test;
public class LowerWeightRoundRobinTest {
-
@Test
public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>();
- sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" +
(System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
- sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" +
(System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24));
- sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" +
(System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100,
"default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100,
"default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
@@ -55,10 +51,10 @@ public class LowerWeightRoundRobinTest {
@Test
public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>();
- sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" +
(System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
- sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" +
(System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84));
- sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" +
(System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84));
- sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" +
(System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
index 14aa7b8..2173ec8 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
@@ -17,14 +17,11 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.dolphinscheduler.remote.utils.Host;
+import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.Collections;
-
/**
* random selector
*/
@@ -39,15 +36,14 @@ public class RandomSelectorTest {
@Test
public void testSelect1() {
RandomSelector selector = new RandomSelector();
- Host result = selector.select(Arrays.asList(new Host("192.168.1.1",
80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20,
System.currentTimeMillis())));
+ HostWorker result = selector.select(Arrays.asList(new
HostWorker("192.168.1.1:11", 100, "default"), new HostWorker("192.168.1.2:22",
80, "default")));
Assert.assertNotNull(result);
}
@Test
public void testSelect() {
RandomSelector selector = new RandomSelector();
- Host result = selector.select(Arrays.asList(new Host("192.168.1.1",
80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20,
System.currentTimeMillis())));
+ HostWorker result = selector.select(Arrays.asList(new
HostWorker("192.168.1.1", 11, 100, "default"), new HostWorker("192.168.1.2:",
22, 20, "default")));
Assert.assertNotNull(result);
-
}
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
index 1c9f492..08badd5 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
@@ -17,14 +17,12 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.dolphinscheduler.remote.utils.Host;
+import java.util.Arrays;
+import java.util.List;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.List;
-
/**
* round robin selector
*/
@@ -39,18 +37,16 @@ public class RoundRobinSelectorTest {
@Test
public void testSelect1() {
RoundRobinSelector selector = new RoundRobinSelector();
- // dismiss of server warm-up time
- long startTime = System.currentTimeMillis() - 60 * 10 * 1000;
- List<Host> hostOneList = Arrays.asList(
- new Host("192.168.1.1", 80, 20, startTime, "kris"),
- new Host("192.168.1.2", 80, 10, startTime, "kris"));
-
- List<Host> hostTwoList = Arrays.asList(
- new Host("192.168.1.1", 80, 20, startTime, "kris"),
- new Host("192.168.1.2", 80, 10, startTime, "kris"),
- new Host("192.168.1.3", 80, 10, startTime, "kris"));
-
- Host result;
+ List<HostWorker> hostOneList = Arrays.asList(
+ new HostWorker("192.168.1.1", 80, 20, "kris"),
+ new HostWorker("192.168.1.2", 80, 10, "kris"));
+
+ List<HostWorker> hostTwoList = Arrays.asList(
+ new HostWorker("192.168.1.1", 80, 20, "kris"),
+ new HostWorker("192.168.1.2", 80, 10, "kris"),
+ new HostWorker("192.168.1.3", 80, 10, "kris"));
+
+ HostWorker result;
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
@@ -93,17 +89,15 @@ public class RoundRobinSelectorTest {
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
-
}
@Test
- public void testWarmUpRoundRobinSelector() {
+ public void testWeightRoundRobinSelector() {
RoundRobinSelector selector = new RoundRobinSelector();
- Host result;
+ HostWorker result;
result = selector.select(
- Arrays.asList(new Host("192.168.1.1", 80, 20,
System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2",
80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris")));
+ Arrays.asList(new HostWorker("192.168.1.1", 11, 20, "kris"), new
HostWorker("192.168.1.2", 22, 80, "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
-
}
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index 9b62473..8068ebd 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import static
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -59,7 +60,7 @@ public class MasterRegistryTest {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2);
//wait heartbeat info write into zk node
- String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS +
":" + masterConfig.getListenPort());
+ String masterNodePath = masterPath + "/" +
(NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
String heartbeat =
zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH,
heartbeat.split(",").length);
masterRegistry.unRegistry();
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
index 0fd4a4f..0c6ac6d 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
@@ -69,10 +69,10 @@ public class RegisterOperator extends
ZookeeperCachedOperator {
}
/**
- * get host ip, string format: masterParentPath/ip
+ * get host ip:port, string format: parentPath/ip:port
*
* @param path path
- * @return host ip, string format: masterParentPath/ip
+ * @return host ip:port, string format: parentPath/ip:port
*/
protected String getHostByEventDataPath(String path) {
if (StringUtils.isEmpty(path)) {
diff --git a/pom.xml b/pom.xml
index 4662fc4..2d18af9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -896,15 +896,15 @@
<include>**/dao/datasource/MySQLDataSourceTest.java</include>
<include>**/dao/entity/TaskInstanceTest.java</include>
<include>**/dao/entity/UdfFuncTest.java</include>
- <include>**/remote/JsonSerializerTest.java</include>
-
<include>**/remote/RemoveTaskLogResponseCommandTest.java</include>
- <include>**/rpc/RpcTest.java</include>
-
<include>**/remote/RemoveTaskLogRequestCommandTest.java</include>
-
<include>**/remote/NettyRemotingClientTest.java</include>
- <include>**/remote/NettyUtilTest.java</include>
- <include>**/remote/ResponseFutureTest.java</include>
<include>**/remote/command/alert/AlertSendRequestCommandTest.java</include>
<include>**/remote/command/alert/AlertSendResponseCommandTest.java</include>
+
<include>**/remote/command/future/ResponseFutureTest.java</include>
+
<include>**/remote/command/log/RemoveTaskLogRequestCommandTest.java</include>
+
<include>**/remote/command/log/RemoveTaskLogResponseCommandTest.java</include>
+ <include>**/remote/utils/HostTest.java</include>
+ <include>**/remote/utils/NettyUtilTest.java</include>
+
<include>**/remote/NettyRemotingClientTest.java</include>
+ <include>**/rpc/RpcTest.java</include>
<include>**/server/log/LoggerServerTest.java</include>
<include>**/server/entity/SQLTaskExecutionContextTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include>
@@ -919,6 +919,7 @@
<include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
<include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
+
<include>**/server/master/dispatch/host/assign/HostWorkerTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
@@ -935,7 +936,6 @@
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
- <include>**/server/utils/HostTest.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/MapReduceArgsUtilsTest.java</include>