This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b4af3fd [Feature-2815][server] One worker can belong to different
workergroups (#2934)
b4af3fd is described below
commit b4af3fd1764756fde4c5395639d7816d03e77bff
Author: Yichao Yang <[email protected]>
AuthorDate: Mon Jul 13 17:25:13 2020 +0800
[Feature-2815][server] One worker can belong to different workergroups
(#2934)
* [Feature-2815][server] One worker can belong to different workergroups
* Feature: Add test cases
* Update MonitorService.java
* Update MonitorService.java
Co-authored-by: dailidong <[email protected]>
---
.../api/service/MonitorService.java | 54 ++++++--
.../common/model/WorkerServerModel.java | 122 +++++++++++++++++
.../server/master/registry/MasterRegistry.java | 15 ++-
.../server/registry/HeartBeatTask.java | 33 ++---
.../server/worker/config/WorkerConfig.java | 14 +-
.../server/worker/registry/WorkerRegistry.java | 115 ++++++++--------
.../src/main/resources/worker.properties | 2 +-
.../server/worker/registry/WorkerRegistryTest.java | 148 +++++++++++++++++----
.../pages/servers/_source/zookeeperDirectories.vue | 112 ++++++++++++++++
.../home/pages/monitor/pages/servers/worker.vue | 27 +++-
.../src/js/module/i18n/locale/en_US.js | 4 +-
.../src/js/module/i18n/locale/zh_CN.js | 4 +-
pom.xml | 2 +-
13 files changed, 525 insertions(+), 127 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
index 3370961..55c4fa1 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
@@ -16,29 +16,33 @@
*/
package org.apache.dolphinscheduler.api.service;
+import static
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
-import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.WorkerServerModel;
+import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
+import com.google.common.collect.Sets;
/**
* monitor service
*/
@Service
-public class MonitorService extends BaseService{
+public class MonitorService extends BaseService {
@Autowired
private ZookeeperMonitor zookeeperMonitor;
@@ -108,15 +112,41 @@ public class MonitorService extends BaseService{
public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>(5);
- List<Server> masterServers = getServerListFromZK(false);
-
- result.put(Constants.DATA_LIST, masterServers);
+ List<WorkerServerModel> workerServers = getServerListFromZK(false)
+ .stream()
+ .map((Server server) -> {
+ WorkerServerModel model = new WorkerServerModel();
+ model.setId(server.getId());
+ model.setHost(server.getHost());
+ model.setPort(server.getPort());
+ model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
+ model.setResInfo(server.getResInfo());
+ model.setCreateTime(server.getCreateTime());
+ model.setLastHeartbeatTime(server.getLastHeartbeatTime());
+ return model;
+ })
+ .collect(Collectors.toList());
+
+ Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
+ .stream()
+ .collect(Collectors.toMap(
+ (WorkerServerModel worker) -> {
+ String[] s =
worker.getZkDirectories().iterator().next().split("/");
+ return s[s.length - 1];
+ }
+ , Function.identity()
+ , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
+
oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
+ return oldOne;
+ }));
+
+ result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
putMsg(result,Status.SUCCESS);
return result;
}
- public List<Server> getServerListFromZK(boolean isMaster){
+ public List<Server> getServerListFromZK(boolean isMaster) {
checkNotNull(zookeeperMonitor);
ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java
new file mode 100644
index 0000000..984124b
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.model;
+
+
+import java.util.Date;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+
+/**
+ * server
+ */
+public class WorkerServerModel {
+
+ /**
+ * id
+ */
+ private int id;
+
+ /**
+ * host
+ */
+ private String host;
+
+ /**
+ * port
+ */
+ private int port;
+
+ /**
+ * worker directories in zookeeper
+ */
+ private Set<String> zkDirectories;
+
+ /**
+ * resource info about CPU and memory
+ */
+ private String resInfo;
+
+ /**
+ * create time
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+ private Date createTime;
+
+ /**
+ * last heart beat time
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+ private Date lastHeartbeatTime;
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Set<String> getZkDirectories() {
+ return zkDirectories;
+ }
+
+ public void setZkDirectories(Set<String> zkDirectories) {
+ this.zkDirectories = zkDirectories;
+ }
+
+ public Date getLastHeartbeatTime() {
+ return lastHeartbeatTime;
+ }
+
+ public void setLastHeartbeatTime(Date lastHeartbeatTime) {
+ this.lastHeartbeatTime = lastHeartbeatTime;
+ }
+
+ public String getResInfo() {
+ return resInfo;
+ }
+
+ public void setResInfo(String resInfo) {
+ this.resInfo = resInfo;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 04fb79f..040ea5a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -16,6 +16,13 @@
*/
package org.apache.dolphinscheduler.server.master.registry;
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
@@ -30,11 +37,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import javax.annotation.PostConstruct;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
/**
* master registry
@@ -97,7 +100,7 @@ public class MasterRegistry {
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(),
- getMasterPath(),
+ Sets.newHashSet(getMasterPath()),
zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 2345ce9..bd8c79c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -17,50 +17,50 @@
package org.apache.dolphinscheduler.server.registry;
+import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
+
+import java.util.Date;
+import java.util.Set;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Date;
-
-import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
-
-public class HeartBeatTask extends Thread{
+public class HeartBeatTask extends Thread {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private String startTime;
private double reservedMemory;
private double maxCpuloadAvg;
- private String heartBeatPath;
+ private Set<String> heartBeatPaths;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public HeartBeatTask(String startTime,
double reservedMemory,
double maxCpuloadAvg,
- String heartBeatPath,
- ZookeeperRegistryCenter zookeeperRegistryCenter){
+ Set<String> heartBeatPaths,
+ ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime;
this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg;
- this.heartBeatPath = heartBeatPath;
+ this.heartBeatPaths = heartBeatPaths;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void run() {
try {
-
double availablePhysicalMemorySize =
OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage();
int status = Constants.NORAML_NODE_STATUS;
- if(availablePhysicalMemorySize < reservedMemory
- || loadAverage > maxCpuloadAvg){
- logger.warn("load is too high or
availablePhysicalMemorySize(G) is too low, it's
availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize ,
loadAverage);
+ if (availablePhysicalMemorySize < reservedMemory
+ || loadAverage > maxCpuloadAvg) {
+ logger.warn("load is too high or
availablePhysicalMemorySize(G) is too low, it's
availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize,
loadAverage);
status = Constants.ABNORMAL_NODE_STATUS;
}
@@ -76,8 +76,11 @@ public class HeartBeatTask extends Thread{
builder.append(status).append(COMMA);
//save process id
builder.append(OSUtils.getProcessID());
-
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath,
builder.toString());
- } catch (Throwable ex){
+
+ for (String heartBeatPath : heartBeatPaths) {
+
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath,
builder.toString());
+ }
+ } catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 1a31fa0..2dedaf8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,6 +17,8 @@
*/
package org.apache.dolphinscheduler.server.worker.config;
+import java.util.Set;
+
import org.apache.dolphinscheduler.common.Constants;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
@@ -41,8 +43,8 @@ public class WorkerConfig {
@Value("${worker.reserved.memory:0.3}")
private double workerReservedMemory;
- @Value("${worker.group: default}")
- private String workerGroup;
+ @Value("#{'${worker.groups:default}'.split(',')}")
+ private Set<String> workerGroups;
@Value("${worker.listen.port: 1234}")
private int listenPort;
@@ -55,12 +57,12 @@ public class WorkerConfig {
this.listenPort = listenPort;
}
- public String getWorkerGroup() {
- return workerGroup;
+ public Set<String> getWorkerGroups() {
+ return workerGroups;
}
- public void setWorkerGroup(String workerGroup) {
- this.workerGroup = workerGroup;
+ public void setWorkerGroups(Set<String> workerGroups) {
+ this.workerGroups = workerGroups;
}
public int getWorkerExecThreads() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index e1349ea..5e400e1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -16,10 +16,20 @@
*/
package org.apache.dolphinscheduler.server.worker.registry;
+import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -32,15 +42,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import javax.annotation.PostConstruct;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.SLASH;
+import com.google.common.collect.Sets;
/**
@@ -74,11 +76,11 @@ public class WorkerRegistry {
private String startTime;
- private String workerGroup;
+ private Set<String> workerGroups;
@PostConstruct
- public void init(){
- this.workerGroup = workerConfig.getWorkerGroup();
+ public void init() {
+ this.workerGroups = workerConfig.getWorkerGroups();
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
}
@@ -88,31 +90,35 @@ public class WorkerRegistry {
*/
public void registry() {
String address = NetUtils.getHost();
- String localNodePath = getWorkerPath();
-
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
"");
-
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState
newState) {
- if(newState == ConnectionState.LOST){
- logger.error("worker : {} connection lost from zookeeper",
address);
- } else if(newState == ConnectionState.RECONNECTED){
- logger.info("worker : {} reconnected to zookeeper",
address);
-
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
"");
- } else if(newState == ConnectionState.SUSPENDED){
- logger.warn("worker : {} connection SUSPENDED ", address);
- }
- }
- });
+ Set<String> workerZkPaths = getWorkerZkPaths();
int workerHeartbeatInterval =
workerConfig.getWorkerHeartbeatInterval();
- HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
- workerConfig.getWorkerReservedMemory(),
- workerConfig.getWorkerMaxCpuloadAvg(),
- getWorkerPath(),
- zookeeperRegistryCenter);
- this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
- logger.info("worker node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, workerHeartbeatInterval);
+ for (String workerZKPath : workerZkPaths) {
+
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
"");
+
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState) {
+ if (newState == ConnectionState.LOST) {
+ logger.error("worker : {} connection lost from
zookeeper", address);
+ } else if (newState == ConnectionState.RECONNECTED) {
+ logger.info("worker : {} reconnected to zookeeper",
address);
+
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
"");
+ } else if (newState == ConnectionState.SUSPENDED) {
+ logger.warn("worker : {} connection SUSPENDED ",
address);
+ }
+ }
+ });
+ logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
+ }
+ HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
+ this.workerConfig.getWorkerReservedMemory(),
+ this.workerConfig.getWorkerMaxCpuloadAvg(),
+ workerZkPaths,
+ this.zookeeperRegistryCenter);
+
+ this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
+ logger.info("worker node : {} heartbeat interval {} s", address,
workerHeartbeatInterval);
}
/**
@@ -120,36 +126,41 @@ public class WorkerRegistry {
*/
public void unRegistry() {
String address = getLocalAddress();
- String localNodePath = getWorkerPath();
-
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+ Set<String> workerZkPaths = getWorkerZkPaths();
+ for (String workerZkPath : workerZkPaths) {
+
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
+ logger.info("worker node : {} unRegistry from ZK {}.", address,
workerZkPath);
+ }
this.heartBeatExecutor.shutdownNow();
- logger.info("worker node : {} unRegistry to ZK.", address);
}
/**
* get worker path
- * @return
*/
- private String getWorkerPath() {
+ private Set<String> getWorkerZkPaths() {
+ Set<String> workerZkPaths = Sets.newHashSet();
+
String address = getLocalAddress();
- StringBuilder builder = new StringBuilder(100);
- String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
- builder.append(workerPath).append(SLASH);
- if(StringUtils.isEmpty(workerGroup)){
- workerGroup = DEFAULT_WORKER_GROUP;
+ String workerZkPathPrefix =
this.zookeeperRegistryCenter.getWorkerPath();
+
+ for (String workGroup : this.workerGroups) {
+ StringBuilder workerZkPathBuilder = new StringBuilder(100);
+ workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH);
+ if (StringUtils.isEmpty(workGroup)) {
+ workGroup = DEFAULT_WORKER_GROUP;
+ }
+ // trim and lower case is need
+
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
+ workerZkPathBuilder.append(address);
+ workerZkPaths.add(workerZkPathBuilder.toString());
}
- //trim and lower case is need
- builder.append(workerGroup.trim().toLowerCase()).append(SLASH);
- builder.append(address);
- return builder.toString();
+ return workerZkPaths;
}
/**
* get local address
- * @return
*/
- private String getLocalAddress(){
+ private String getLocalAddress() {
return NetUtils.getHost() + ":" + workerConfig.getListenPort();
}
-
}
diff --git a/dolphinscheduler-server/src/main/resources/worker.properties
b/dolphinscheduler-server/src/main/resources/worker.properties
index eb01bbb..0365c8a 100644
--- a/dolphinscheduler-server/src/main/resources/worker.properties
+++ b/dolphinscheduler-server/src/main/resources/worker.properties
@@ -31,4 +31,4 @@
#worker.listen.port: 1234
# default worker group
-worker.group=default
\ No newline at end of file
+#worker.groups=default
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
index b34ba8b..7fc9d2b 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -17,62 +17,154 @@
package org.apache.dolphinscheduler.server.worker.registry;
+import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
-import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringRunner;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
-import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-
-
-import static
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
/**
* worker registry test
*/
-@RunWith(SpringRunner.class)
-@ContextConfiguration(classes={SpringZKServer.class,
WorkerRegistry.class,ZookeeperRegistryCenter.class, WorkerConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class})
-
+@RunWith(MockitoJUnitRunner.Silent.class)
public class WorkerRegistryTest {
- @Autowired
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WorkerRegistryTest.class);
+
+ private static final String TEST_WORKER_GROUP = "test";
+
+ @InjectMocks
private WorkerRegistry workerRegistry;
- @Autowired
+ @Mock
private ZookeeperRegistryCenter zookeeperRegistryCenter;
- @Autowired
+ @Mock
+ private ZookeeperCachedOperator zookeeperCachedOperator;
+
+ @Mock
+ private CuratorFrameworkImpl zkClient;
+
+ @Mock
private WorkerConfig workerConfig;
+ @Before
+ public void before() {
+ Set<String> workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP,
TEST_WORKER_GROUP);
+ Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
+
+
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
+
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator);
+
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient);
+
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn(
+ new Listenable<ConnectionStateListener>() {
+ @Override
+ public void addListener(ConnectionStateListener
connectionStateListener) {
+ LOGGER.info("add listener");
+ }
+
+ @Override
+ public void addListener(ConnectionStateListener
connectionStateListener, Executor executor) {
+ LOGGER.info("add listener executor");
+ }
+
+ @Override
+ public void removeListener(ConnectionStateListener
connectionStateListener) {
+ LOGGER.info("remove listener");
+ }
+ });
+
+ Mockito.when(workerConfig.getWorkerHeartbeatInterval()).thenReturn(10);
+
+ Mockito.when(workerConfig.getWorkerReservedMemory()).thenReturn(1.1);
+
+ Mockito.when(workerConfig.getWorkerMaxCpuloadAvg()).thenReturn(1);
+ }
+
@Test
- public void testRegistry() throws InterruptedException {
+ public void testRegistry() {
+
+ workerRegistry.init();
+
workerRegistry.registry();
+
String workerPath = zookeeperRegistryCenter.getWorkerPath();
- Assert.assertEquals(DEFAULT_WORKER_GROUP,
workerConfig.getWorkerGroup().trim());
- String instancePath = workerPath + "/" +
workerConfig.getWorkerGroup().trim() + "/" + (NetUtils.getHost() + ":" +
workerConfig.getListenPort());
- TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2);
//wait heartbeat info write into zk node
- String heartbeat =
zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
- Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH,
heartbeat.split(",").length);
+
+ int i = 0;
+ for (String workerGroup : workerConfig.getWorkerGroups()) {
+ String workerZkPath = workerPath + "/" + workerGroup.trim() + "/"
+ (NetUtils.getHost() + ":" + workerConfig.getListenPort());
+ String heartbeat =
zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
+ if (0 == i) {
+
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
+ } else {
+
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/default/"));
+ }
+ i++;
+ }
+
+ workerRegistry.unRegistry();
+
+ workerConfig.getWorkerGroups().add(StringUtils.EMPTY);
+ workerRegistry.init();
+ workerRegistry.registry();
+
+ workerRegistry.unRegistry();
+
+ // testEmptyWorkerGroupsRegistry
+ workerConfig.getWorkerGroups().remove(StringUtils.EMPTY);
+ workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP);
+ workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP);
+ workerRegistry.init();
+ workerRegistry.registry();
+
+ List<String> testWorkerGroupPathZkChildren =
zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" + TEST_WORKER_GROUP);
+ List<String> defaultWorkerGroupPathZkChildren =
zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" +
DEFAULT_WORKER_GROUP);
+
+ Assert.assertEquals(0, testWorkerGroupPathZkChildren.size());
+ Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size());
}
@Test
- public void testUnRegistry() throws InterruptedException {
+ public void testUnRegistry() {
+ workerRegistry.init();
workerRegistry.registry();
- TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2);
//wait heartbeat info write into zk node
+
workerRegistry.unRegistry();
String workerPath = zookeeperRegistryCenter.getWorkerPath();
- String workerGroupPath = workerPath + "/" +
workerConfig.getWorkerGroup().trim();
- List<String> childrenKeys =
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
- Assert.assertTrue(childrenKeys.isEmpty());
+
+ for (String workerGroup : workerConfig.getWorkerGroups()) {
+ String workerGroupPath = workerPath + "/" + workerGroup.trim();
+ List<String> childrenKeys =
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+ Assert.assertTrue(childrenKeys.isEmpty());
+ }
+
+ // testEmptyWorkerGroupsUnRegistry
+ workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP);
+ workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP);
+ workerRegistry.init();
+ workerRegistry.registry();
+
+ workerRegistry.unRegistry();
}
}
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
new file mode 100644
index 0000000..1201cb5
--- /dev/null
+++
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
@@ -0,0 +1,112 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+<template>
+ <div class="container">
+ <div class="title-box">
+ <span class="name">{{$t('zkDirectory')}}</span>
+ </div>
+
+ <div class="table-box" v-if="zkDirectories.length > 0">
+ <table class="fixed">
+ <caption><!-- placeHolder --></caption>
+ <tr>
+ <th scope="col" style="min-width: 40px">
+ <span>#</span>
+ </th>
+ <th scope="col" style="min-width: 40px">
+ <span>{{$t('zkDirectory')}}</span>
+ </th>
+ </tr>
+ <tr v-for="(item, $index) in zkDirectories" :key="item.id">
+ <td>
+ <span>{{$index + 1}}</span>
+ </td>
+ <td>
+ <span>{{item.zkDirectory}}</span>
+ </td>
+ </tr>
+ </table>
+ </div>
+
+ <div v-if="zkDirectories.length === 0">
+ <m-no-data><!----></m-no-data>
+ </div>
+
+ <div v-if="zkDirectories.length > 0">
+ <div class="bottom-box">
+ </div>
+ </div>
+ </div>
+</template>
+
+<script>
+ import mNoData from '@/module/components/noData/noData'
+
+ export default {
+ name: 'zookeeperDirectoriesPopup',
+ data () {
+ return {
+ tableHeaders: [
+ {
+ label: $t('zkDirectory'),
+ prop: 'zkDirectory'
+ }
+ ]
+ }
+ },
+ props: {
+ zkDirectories: Array
+ },
+ components: { mNoData }
+ }
+</script>
+
+<style lang="scss" rel="stylesheet/scss">
+ .container {
+ width: 500px;
+ .title-box {
+ height: 61px;
+ border-bottom: 1px solid #DCDEDC;
+ position: relative;
+ .name {
+ position: absolute;
+ left: 24px;
+ top: 18px;
+ font-size: 16px;
+ }
+ }
+ .bottom-box {
+ position: absolute;
+ bottom: 0;
+ left: 0;
+ width: 100%;
+ text-align: right;
+ height: 60px;
+ line-height: 60px;
+ border-top: 1px solid #DCDEDC;
+ background: #fff;
+ .ans-page {
+ display: inline-block;
+ }
+ }
+ .table-box {
+ overflow-y: scroll;
+ height: calc(100vh - 61px);
+ padding-bottom: 60px;
+ }
+ }
+</style>
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
index ceaed89..c8c0ed1 100644
---
a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
+++
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
@@ -22,8 +22,8 @@
<div class="row-title">
<div class="left">
<span class="sp">IP: {{item.host}}</span>
- <span class="sp">{{$t('Process Pid')}}: {{item.id}}</span>
- <span class="sp">{{$t('Zk registration directory')}}:
{{item.zkDirectory}}</span>
+ <span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
+ <span>{{$t('Zk registration directory')}}: <a href="javascript:"
@click="_showZkDirectories(item)" class="links">{{$t('Directory
detail')}}</a></span>
</div>
<div class="right">
<span class="sp">{{$t('Create Time')}}: {{item.createTime |
formatDate}}</span>
@@ -74,6 +74,7 @@
import mNoData from '@/module/components/noData/noData'
import themeData from '@/module/echarts/themeData.json'
import mListConstruction from
'@/module/components/listConstruction/listConstruction'
+ import zookeeperDirectoriesPopup from './_source/zookeeperDirectories'
export default {
name: 'servers-worker',
@@ -86,7 +87,25 @@
},
props: {},
methods: {
- ...mapActions('monitor', ['getWorkerData'])
+ ...mapActions('monitor', ['getWorkerData']),
+ _showZkDirectories (item) {
+ let zkDirectories = []
+ item.zkDirectories.forEach(zkDirectory => {
+ zkDirectories.push({
+ zkDirectory: zkDirectory
+ })
+ })
+ this.$drawer({
+ direction: 'right',
+ render (h) {
+ return h(zookeeperDirectoriesPopup, {
+ props: {
+ zkDirectories: zkDirectories
+ }
+ })
+ }
+ })
+ }
},
watch: {},
created () {
@@ -105,7 +124,7 @@
this.isLoading = true
})
},
- components: { mList, mListConstruction, mSpin, mNoData, mGauge }
+ components: { mList, mListConstruction, mSpin, mNoData, mGauge,
zookeeperDirectoriesPopup }
}
</script>
<style lang="scss" rel="stylesheet/scss">
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index 3e0ed54..9f2c275 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -616,5 +616,7 @@ export default {
'Disable': 'Disable',
'The Worker group no longer exists, please select the correct Worker
group!': 'The Worker group no longer exists, please select the correct Worker
group!',
'Please confirm whether the workflow has been saved before downloading':
'Please confirm whether the workflow has been saved before downloading',
- 'User name length is between 3 and 39': 'User name length is between 3 and
39'
+ 'User name length is between 3 and 39': 'User name length is between 3 and
39',
+ zkDirectory: 'zkDirectory',
+ 'Directory detail': 'Directory detail'
}
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index a610d8b..f55b1c7 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -616,5 +616,7 @@ export default {
'Socket Timeout':'Socket超时',
'Connect timeout be a positive integer': '连接超时必须为数字',
'Socket Timeout be a positive integer': 'Socket超时必须为数字',
- 'ms':'毫秒'
+ 'ms':'毫秒',
+ zkDirectory: 'zk注册目录',
+ 'Directory detail': '查看目录详情'
}
diff --git a/pom.xml b/pom.xml
index 09636e4..0591525 100644
--- a/pom.xml
+++ b/pom.xml
@@ -817,7 +817,7 @@
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<!--<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>-->
-
<!--<include>**/server/worker/registry/WorkerRegistryTest.java</include>-->
+
<include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>