This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b4af3fd  [Feature-2815][server] One worker can belong to different 
workergroups (#2934)
b4af3fd is described below

commit b4af3fd1764756fde4c5395639d7816d03e77bff
Author: Yichao Yang <[email protected]>
AuthorDate: Mon Jul 13 17:25:13 2020 +0800

    [Feature-2815][server] One worker can belong to different workergroups 
(#2934)
    
    * [Feature-2815][server] One worker can belong to different workergroups
    
    * Feature: Add test cases
    
    * Update MonitorService.java
    
    * Update MonitorService.java
    
    Co-authored-by: dailidong <[email protected]>
---
 .../api/service/MonitorService.java                |  54 ++++++--
 .../common/model/WorkerServerModel.java            | 122 +++++++++++++++++
 .../server/master/registry/MasterRegistry.java     |  15 ++-
 .../server/registry/HeartBeatTask.java             |  33 ++---
 .../server/worker/config/WorkerConfig.java         |  14 +-
 .../server/worker/registry/WorkerRegistry.java     | 115 ++++++++--------
 .../src/main/resources/worker.properties           |   2 +-
 .../server/worker/registry/WorkerRegistryTest.java | 148 +++++++++++++++++----
 .../pages/servers/_source/zookeeperDirectories.vue | 112 ++++++++++++++++
 .../home/pages/monitor/pages/servers/worker.vue    |  27 +++-
 .../src/js/module/i18n/locale/en_US.js             |   4 +-
 .../src/js/module/i18n/locale/zh_CN.js             |   4 +-
 pom.xml                                            |   2 +-
 13 files changed, 525 insertions(+), 127 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
index 3370961..55c4fa1 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
@@ -16,29 +16,33 @@
  */
 package org.apache.dolphinscheduler.api.service;
 
+import static 
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
-import org.apache.dolphinscheduler.dao.MonitorDBDao;
 import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.WorkerServerModel;
+import org.apache.dolphinscheduler.dao.MonitorDBDao;
 import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
+import com.google.common.collect.Sets;
 
 /**
  * monitor service
  */
 @Service
-public class MonitorService extends BaseService{
+public class MonitorService extends BaseService {
 
   @Autowired
   private ZookeeperMonitor zookeeperMonitor;
@@ -108,15 +112,41 @@ public class MonitorService extends BaseService{
   public Map<String,Object> queryWorker(User loginUser) {
 
     Map<String, Object> result = new HashMap<>(5);
-    List<Server> masterServers = getServerListFromZK(false);
-
-    result.put(Constants.DATA_LIST, masterServers);
+    List<WorkerServerModel> workerServers = getServerListFromZK(false)
+            .stream()
+            .map((Server server) -> {
+              WorkerServerModel model = new WorkerServerModel();
+              model.setId(server.getId());
+              model.setHost(server.getHost());
+              model.setPort(server.getPort());
+              model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
+              model.setResInfo(server.getResInfo());
+              model.setCreateTime(server.getCreateTime());
+              model.setLastHeartbeatTime(server.getLastHeartbeatTime());
+              return model;
+            })
+            .collect(Collectors.toList());
+
+    Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
+            .stream()
+            .collect(Collectors.toMap(
+                    (WorkerServerModel worker) -> {
+                        String[] s = 
worker.getZkDirectories().iterator().next().split("/");
+                        return s[s.length - 1];
+                    }
+                    , Function.identity()
+                    , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
+                      
oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
+                      return oldOne;
+                    }));
+
+    result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
     putMsg(result,Status.SUCCESS);
 
     return result;
   }
 
-  public List<Server> getServerListFromZK(boolean isMaster){
+  public List<Server> getServerListFromZK(boolean isMaster) {
 
     checkNotNull(zookeeperMonitor);
     ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java
new file mode 100644
index 0000000..984124b
--- /dev/null
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.model;
+
+
+import java.util.Date;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+
+/**
+ * server
+ */
+public class WorkerServerModel {
+
+    /**
+     * id
+     */
+    private int id;
+
+    /**
+     * host
+     */
+    private String host;
+
+    /**
+     * port
+     */
+    private int port;
+
+    /**
+     * worker directories in zookeeper
+     */
+    private Set<String> zkDirectories;
+
+    /**
+     * resource info about CPU and memory
+     */
+    private String resInfo;
+
+    /**
+     * create time
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+    private Date createTime;
+
+    /**
+     * last heart beat time
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+    private Date lastHeartbeatTime;
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public Date getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(Date createTime) {
+        this.createTime = createTime;
+    }
+
+    public Set<String> getZkDirectories() {
+        return zkDirectories;
+    }
+
+    public void setZkDirectories(Set<String> zkDirectories) {
+        this.zkDirectories = zkDirectories;
+    }
+
+    public Date getLastHeartbeatTime() {
+        return lastHeartbeatTime;
+    }
+
+    public void setLastHeartbeatTime(Date lastHeartbeatTime) {
+        this.lastHeartbeatTime = lastHeartbeatTime;
+    }
+
+    public String getResInfo() {
+        return resInfo;
+    }
+
+    public void setResInfo(String resInfo) {
+        this.resInfo = resInfo;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 04fb79f..040ea5a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -16,6 +16,13 @@
  */
 package org.apache.dolphinscheduler.server.master.registry;
 
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
@@ -30,11 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.PostConstruct;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
 
 /**
  *  master registry
@@ -97,7 +100,7 @@ public class MasterRegistry {
         HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
                 masterConfig.getMasterReservedMemory(),
                 masterConfig.getMasterMaxCpuloadAvg(),
-                getMasterPath(),
+                Sets.newHashSet(getMasterPath()),
                 zookeeperRegistryCenter);
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 2345ce9..bd8c79c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -17,50 +17,50 @@
 
 package org.apache.dolphinscheduler.server.registry;
 
+import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
+
+import java.util.Date;
+import java.util.Set;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Date;
-
-import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
-
-public class HeartBeatTask extends Thread{
+public class HeartBeatTask extends Thread {
 
     private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
 
     private String startTime;
     private double reservedMemory;
     private double maxCpuloadAvg;
-    private String heartBeatPath;
+    private Set<String> heartBeatPaths;
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     public HeartBeatTask(String startTime,
                          double reservedMemory,
                          double maxCpuloadAvg,
-                         String heartBeatPath,
-                         ZookeeperRegistryCenter zookeeperRegistryCenter){
+                         Set<String> heartBeatPaths,
+                         ZookeeperRegistryCenter zookeeperRegistryCenter) {
         this.startTime = startTime;
         this.reservedMemory = reservedMemory;
         this.maxCpuloadAvg = maxCpuloadAvg;
-        this.heartBeatPath = heartBeatPath;
+        this.heartBeatPaths = heartBeatPaths;
         this.zookeeperRegistryCenter = zookeeperRegistryCenter;
     }
 
     @Override
     public void run() {
         try {
-
             double availablePhysicalMemorySize = 
OSUtils.availablePhysicalMemorySize();
             double loadAverage = OSUtils.loadAverage();
 
             int status = Constants.NORAML_NODE_STATUS;
 
-            if(availablePhysicalMemorySize < reservedMemory
-                    || loadAverage > maxCpuloadAvg){
-                logger.warn("load is too high or 
availablePhysicalMemorySize(G) is too low, it's 
availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , 
loadAverage);
+            if (availablePhysicalMemorySize < reservedMemory
+                    || loadAverage > maxCpuloadAvg) {
+                logger.warn("load is too high or 
availablePhysicalMemorySize(G) is too low, it's 
availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, 
loadAverage);
                 status = Constants.ABNORMAL_NODE_STATUS;
             }
 
@@ -76,8 +76,11 @@ public class HeartBeatTask extends Thread{
             builder.append(status).append(COMMA);
             //save process id
             builder.append(OSUtils.getProcessID());
-            
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, 
builder.toString());
-        } catch (Throwable ex){
+
+            for (String heartBeatPath : heartBeatPaths) {
+                
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, 
builder.toString());
+            }
+        } catch (Throwable ex) {
             logger.error("error write heartbeat info", ex);
         }
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 1a31fa0..2dedaf8 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.dolphinscheduler.server.worker.config;
 
+import java.util.Set;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.PropertySource;
@@ -41,8 +43,8 @@ public class WorkerConfig {
     @Value("${worker.reserved.memory:0.3}")
     private double workerReservedMemory;
 
-    @Value("${worker.group: default}")
-    private String workerGroup;
+    @Value("#{'${worker.groups:default}'.split(',')}")
+    private Set<String> workerGroups;
 
     @Value("${worker.listen.port: 1234}")
     private int listenPort;
@@ -55,12 +57,12 @@ public class WorkerConfig {
         this.listenPort = listenPort;
     }
 
-    public String getWorkerGroup() {
-        return workerGroup;
+    public Set<String> getWorkerGroups() {
+        return workerGroups;
     }
 
-    public void setWorkerGroup(String workerGroup) {
-        this.workerGroup = workerGroup;
+    public void setWorkerGroups(Set<String> workerGroups) {
+        this.workerGroups = workerGroups;
     }
 
     public int getWorkerExecThreads() {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index e1349ea..5e400e1 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -16,10 +16,20 @@
  */
 package org.apache.dolphinscheduler.server.worker.registry;
 
+import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -32,15 +42,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.PostConstruct;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.SLASH;
+import com.google.common.collect.Sets;
 
 
 /**
@@ -74,11 +76,11 @@ public class WorkerRegistry {
     private String startTime;
 
 
-    private String workerGroup;
+    private Set<String> workerGroups;
 
     @PostConstruct
-    public void init(){
-        this.workerGroup = workerConfig.getWorkerGroup();
+    public void init() {
+        this.workerGroups = workerConfig.getWorkerGroups();
         this.startTime = DateUtils.dateToString(new Date());
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
     }
@@ -88,31 +90,35 @@ public class WorkerRegistry {
      */
     public void registry() {
         String address = NetUtils.getHost();
-        String localNodePath = getWorkerPath();
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
 "");
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
 ConnectionStateListener() {
-            @Override
-            public void stateChanged(CuratorFramework client, ConnectionState 
newState) {
-                if(newState == ConnectionState.LOST){
-                    logger.error("worker : {} connection lost from zookeeper", 
address);
-                } else if(newState == ConnectionState.RECONNECTED){
-                    logger.info("worker : {} reconnected to zookeeper", 
address);
-                    
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath,
 "");
-                } else if(newState == ConnectionState.SUSPENDED){
-                    logger.warn("worker : {} connection SUSPENDED ", address);
-                }
-            }
-        });
+        Set<String> workerZkPaths = getWorkerZkPaths();
         int workerHeartbeatInterval = 
workerConfig.getWorkerHeartbeatInterval();
 
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
-                workerConfig.getWorkerReservedMemory(),
-                workerConfig.getWorkerMaxCpuloadAvg(),
-                getWorkerPath(),
-                zookeeperRegistryCenter);
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
-        logger.info("worker node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, workerHeartbeatInterval);
+        for (String workerZKPath : workerZkPaths) {
+            
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
 "");
+            
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new
 ConnectionStateListener() {
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState) {
+                    if (newState == ConnectionState.LOST) {
+                        logger.error("worker : {} connection lost from 
zookeeper", address);
+                    } else if (newState == ConnectionState.RECONNECTED) {
+                        logger.info("worker : {} reconnected to zookeeper", 
address);
+                        
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath,
 "");
+                    } else if (newState == ConnectionState.SUSPENDED) {
+                        logger.warn("worker : {} connection SUSPENDED ", 
address);
+                    }
+                }
+            });
+            logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
+        }
 
+        HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
+                this.workerConfig.getWorkerReservedMemory(),
+                this.workerConfig.getWorkerMaxCpuloadAvg(),
+                workerZkPaths,
+                this.zookeeperRegistryCenter);
+
+        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("worker node : {} heartbeat interval {} s", address, 
workerHeartbeatInterval);
     }
 
     /**
@@ -120,36 +126,41 @@ public class WorkerRegistry {
      */
     public void unRegistry() {
         String address = getLocalAddress();
-        String localNodePath = getWorkerPath();
-        
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
+        Set<String> workerZkPaths = getWorkerZkPaths();
+        for (String workerZkPath : workerZkPaths) {
+            
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
+            logger.info("worker node : {} unRegistry from ZK {}.", address, 
workerZkPath);
+        }
         this.heartBeatExecutor.shutdownNow();
-        logger.info("worker node : {} unRegistry to ZK.", address);
     }
 
     /**
      *  get worker path
-     * @return
      */
-    private String getWorkerPath() {
+    private Set<String> getWorkerZkPaths() {
+        Set<String> workerZkPaths = Sets.newHashSet();
+
         String address = getLocalAddress();
-        StringBuilder builder = new StringBuilder(100);
-        String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
-        builder.append(workerPath).append(SLASH);
-        if(StringUtils.isEmpty(workerGroup)){
-            workerGroup = DEFAULT_WORKER_GROUP;
+        String workerZkPathPrefix = 
this.zookeeperRegistryCenter.getWorkerPath();
+
+        for (String workGroup : this.workerGroups) {
+            StringBuilder workerZkPathBuilder = new StringBuilder(100);
+            workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH);
+            if (StringUtils.isEmpty(workGroup)) {
+                workGroup = DEFAULT_WORKER_GROUP;
+            }
+            // trim and lower case is need
+            
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
+            workerZkPathBuilder.append(address);
+            workerZkPaths.add(workerZkPathBuilder.toString());
         }
-        //trim and lower case is need
-        builder.append(workerGroup.trim().toLowerCase()).append(SLASH);
-        builder.append(address);
-        return builder.toString();
+        return workerZkPaths;
     }
 
     /**
      *  get local address
-     * @return
      */
-    private String getLocalAddress(){
+    private String getLocalAddress() {
         return NetUtils.getHost() + ":" + workerConfig.getListenPort();
     }
-
 }
diff --git a/dolphinscheduler-server/src/main/resources/worker.properties 
b/dolphinscheduler-server/src/main/resources/worker.properties
index eb01bbb..0365c8a 100644
--- a/dolphinscheduler-server/src/main/resources/worker.properties
+++ b/dolphinscheduler-server/src/main/resources/worker.properties
@@ -31,4 +31,4 @@
 #worker.listen.port: 1234
 
 # default worker group
-worker.group=default
\ No newline at end of file
+#worker.groups=default
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
index b34ba8b..7fc9d2b 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -17,62 +17,154 @@
 
 package org.apache.dolphinscheduler.server.worker.registry;
 
+import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
-import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringRunner;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
 
-import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-
-
-import static 
org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
 /**
  * worker registry test
  */
-@RunWith(SpringRunner.class)
-@ContextConfiguration(classes={SpringZKServer.class, 
WorkerRegistry.class,ZookeeperRegistryCenter.class, WorkerConfig.class, 
ZookeeperCachedOperator.class, ZookeeperConfig.class})
-
+@RunWith(MockitoJUnitRunner.Silent.class)
 public class WorkerRegistryTest {
 
-    @Autowired
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkerRegistryTest.class);
+
+    private static final String TEST_WORKER_GROUP = "test";
+
+    @InjectMocks
     private WorkerRegistry workerRegistry;
 
-    @Autowired
+    @Mock
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
-    @Autowired
+    @Mock
+    private ZookeeperCachedOperator zookeeperCachedOperator;
+
+    @Mock
+    private CuratorFrameworkImpl zkClient;
+
+    @Mock
     private WorkerConfig workerConfig;
 
+    @Before
+    public void before() {
+        Set<String> workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, 
TEST_WORKER_GROUP);
+        Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
+
+        
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
+        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator);
+        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient);
+        
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn(
+                new Listenable<ConnectionStateListener>() {
+                    @Override
+                    public void addListener(ConnectionStateListener 
connectionStateListener) {
+                        LOGGER.info("add listener");
+                    }
+
+                    @Override
+                    public void addListener(ConnectionStateListener 
connectionStateListener, Executor executor) {
+                        LOGGER.info("add listener executor");
+                    }
+
+                    @Override
+                    public void removeListener(ConnectionStateListener 
connectionStateListener) {
+                        LOGGER.info("remove listener");
+                    }
+                });
+
+        Mockito.when(workerConfig.getWorkerHeartbeatInterval()).thenReturn(10);
+
+        Mockito.when(workerConfig.getWorkerReservedMemory()).thenReturn(1.1);
+
+        Mockito.when(workerConfig.getWorkerMaxCpuloadAvg()).thenReturn(1);
+    }
+
     @Test
-    public void testRegistry() throws InterruptedException {
+    public void testRegistry() {
+
+        workerRegistry.init();
+
         workerRegistry.registry();
+
         String workerPath = zookeeperRegistryCenter.getWorkerPath();
-        Assert.assertEquals(DEFAULT_WORKER_GROUP, 
workerConfig.getWorkerGroup().trim());
-        String instancePath = workerPath + "/" + 
workerConfig.getWorkerGroup().trim() + "/" + (NetUtils.getHost() + ":" + 
workerConfig.getListenPort());
-        TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); 
//wait heartbeat info write into zk node
-        String heartbeat = 
zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
-        Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, 
heartbeat.split(",").length);
+
+        int i = 0;
+        for (String workerGroup : workerConfig.getWorkerGroups()) {
+            String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" 
+ (NetUtils.getHost() + ":" + workerConfig.getListenPort());
+            String heartbeat = 
zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
+            if (0 == i) {
+                
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
+            } else {
+                
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/default/"));
+            }
+            i++;
+        }
+
+        workerRegistry.unRegistry();
+
+        workerConfig.getWorkerGroups().add(StringUtils.EMPTY);
+        workerRegistry.init();
+        workerRegistry.registry();
+
+        workerRegistry.unRegistry();
+
+        // testEmptyWorkerGroupsRegistry
+        workerConfig.getWorkerGroups().remove(StringUtils.EMPTY);
+        workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP);
+        workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP);
+        workerRegistry.init();
+        workerRegistry.registry();
+
+        List<String> testWorkerGroupPathZkChildren = 
zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" + TEST_WORKER_GROUP);
+        List<String> defaultWorkerGroupPathZkChildren = 
zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" + 
DEFAULT_WORKER_GROUP);
+
+        Assert.assertEquals(0, testWorkerGroupPathZkChildren.size());
+        Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size());
     }
 
     @Test
-    public void testUnRegistry() throws InterruptedException {
+    public void testUnRegistry() {
+        workerRegistry.init();
         workerRegistry.registry();
-        TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); 
//wait heartbeat info write into zk node
+
         workerRegistry.unRegistry();
         String workerPath = zookeeperRegistryCenter.getWorkerPath();
-        String workerGroupPath = workerPath + "/" + 
workerConfig.getWorkerGroup().trim();
-        List<String> childrenKeys = 
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
-        Assert.assertTrue(childrenKeys.isEmpty());
+
+        for (String workerGroup : workerConfig.getWorkerGroups()) {
+            String workerGroupPath = workerPath + "/" + workerGroup.trim();
+            List<String> childrenKeys = 
zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+            Assert.assertTrue(childrenKeys.isEmpty());
+        }
+
+        // testEmptyWorkerGroupsUnRegistry
+        workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP);
+        workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP);
+        workerRegistry.init();
+        workerRegistry.registry();
+
+        workerRegistry.unRegistry();
     }
 }
diff --git 
a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
 
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
new file mode 100644
index 0000000..1201cb5
--- /dev/null
+++ 
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
@@ -0,0 +1,112 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+<template>
+  <div class="container">
+    <div class="title-box">
+      <span class="name">{{$t('zkDirectory')}}</span>
+    </div>
+
+    <div class="table-box" v-if="zkDirectories.length > 0">
+      <table class="fixed">
+        <caption><!-- placeHolder --></caption>
+        <tr>
+          <th scope="col" style="min-width: 40px">
+            <span>#</span>
+          </th>
+          <th scope="col" style="min-width: 40px">
+            <span>{{$t('zkDirectory')}}</span>
+          </th>
+        </tr>
+        <tr v-for="(item, $index) in zkDirectories" :key="item.id">
+          <td>
+            <span>{{$index + 1}}</span>
+          </td>
+          <td>
+            <span>{{item.zkDirectory}}</span>
+          </td>
+        </tr>
+      </table>
+    </div>
+
+    <div v-if="zkDirectories.length === 0">
+      <m-no-data><!----></m-no-data>
+    </div>
+
+    <div v-if="zkDirectories.length > 0">
+      <div class="bottom-box">
+      </div>
+    </div>
+  </div>
+</template>
+
+<script>
+  import mNoData from '@/module/components/noData/noData'
+
+  export default {
+    name: 'zookeeperDirectoriesPopup',
+    data () {
+      return {
+        tableHeaders: [
+          {
+            label: $t('zkDirectory'),
+            prop: 'zkDirectory'
+          }
+        ]
+      }
+    },
+    props: {
+      zkDirectories: Array
+    },
+    components: { mNoData }
+  }
+</script>
+
+<style lang="scss" rel="stylesheet/scss">
+  .container {
+    width: 500px;
+    .title-box {
+      height: 61px;
+      border-bottom: 1px solid #DCDEDC;
+      position: relative;
+      .name {
+        position: absolute;
+        left: 24px;
+        top: 18px;
+        font-size: 16px;
+      }
+    }
+    .bottom-box {
+      position: absolute;
+      bottom: 0;
+      left: 0;
+      width: 100%;
+      text-align: right;
+      height: 60px;
+      line-height: 60px;
+      border-top: 1px solid #DCDEDC;
+      background: #fff;
+      .ans-page {
+        display: inline-block;
+      }
+    }
+    .table-box {
+      overflow-y: scroll;
+      height: calc(100vh - 61px);
+      padding-bottom: 60px;
+    }
+  }
+</style>
diff --git 
a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue 
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
index ceaed89..c8c0ed1 100644
--- 
a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
+++ 
b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
@@ -22,8 +22,8 @@
           <div class="row-title">
             <div class="left">
               <span class="sp">IP: {{item.host}}</span>
-              <span class="sp">{{$t('Process Pid')}}: {{item.id}}</span>
-              <span class="sp">{{$t('Zk registration directory')}}: 
{{item.zkDirectory}}</span>
+              <span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
+              <span>{{$t('Zk registration directory')}}: <a href="javascript:" 
@click="_showZkDirectories(item)" class="links">{{$t('Directory 
detail')}}</a></span>
             </div>
             <div class="right">
               <span class="sp">{{$t('Create Time')}}: {{item.createTime | 
formatDate}}</span>
@@ -74,6 +74,7 @@
   import mNoData from '@/module/components/noData/noData'
   import themeData from '@/module/echarts/themeData.json'
   import mListConstruction from 
'@/module/components/listConstruction/listConstruction'
+  import zookeeperDirectoriesPopup from './_source/zookeeperDirectories'
 
   export default {
     name: 'servers-worker',
@@ -86,7 +87,25 @@
     },
     props: {},
     methods: {
-      ...mapActions('monitor', ['getWorkerData'])
+      ...mapActions('monitor', ['getWorkerData']),
+      _showZkDirectories (item) {
+        let zkDirectories = []
+        item.zkDirectories.forEach(zkDirectory => {
+          zkDirectories.push({
+            zkDirectory: zkDirectory
+          })
+        })
+        this.$drawer({
+          direction: 'right',
+          render (h) {
+            return h(zookeeperDirectoriesPopup, {
+              props: {
+                zkDirectories: zkDirectories
+              }
+            })
+          }
+        })
+      }
     },
     watch: {},
     created () {
@@ -105,7 +124,7 @@
         this.isLoading = true
       })
     },
-    components: { mList, mListConstruction, mSpin, mNoData, mGauge }
+    components: { mList, mListConstruction, mSpin, mNoData, mGauge, 
zookeeperDirectoriesPopup }
   }
 </script>
 <style lang="scss" rel="stylesheet/scss">
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js 
b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index 3e0ed54..9f2c275 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -616,5 +616,7 @@ export default {
   'Disable': 'Disable',
   'The Worker group no longer exists, please select the correct Worker 
group!': 'The Worker group no longer exists, please select the correct Worker 
group!',
   'Please confirm whether the workflow has been saved before downloading': 
'Please confirm whether the workflow has been saved before downloading',
-  'User name length is between 3 and 39': 'User name length is between 3 and 
39'
+  'User name length is between 3 and 39': 'User name length is between 3 and 
39',
+  zkDirectory: 'zkDirectory',
+  'Directory detail': 'Directory detail'
 }
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js 
b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index a610d8b..f55b1c7 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -616,5 +616,7 @@ export default {
   'Socket Timeout':'Socket超时',
   'Connect timeout be a positive integer': '连接超时必须为数字',
   'Socket Timeout be a positive integer': 'Socket超时必须为数字',
-  'ms':'毫秒'
+  'ms':'毫秒',
+  zkDirectory: 'zk注册目录',
+  'Directory detail': '查看目录详情'
 }
diff --git a/pom.xml b/pom.xml
index 09636e4..0591525 100644
--- a/pom.xml
+++ b/pom.xml
@@ -817,7 +817,7 @@
                         
<include>**/server/utils/ProcessUtilsTest.java</include>
                         
<include>**/server/utils/SparkArgsUtilsTest.java</include>
                         
<!--<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>-->
-                        
<!--<include>**/server/worker/registry/WorkerRegistryTest.java</include>-->
+                        
<include>**/server/worker/registry/WorkerRegistryTest.java</include>
                         
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
                         
<include>**/server/worker/sql/SqlExecutorTest.java</include>
                         
<include>**/server/worker/task/spark/SparkTaskTest.java</include>

Reply via email to