This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 3bad56c  Refactor worker (#2042)
3bad56c is described below

commit 3bad56ca157d5b55e9ff49adc2ad37e6dc3458c9
Author: Tboy <[email protected]>
AuthorDate: Sun Mar 1 11:38:37 2020 +0800

    Refactor worker (#2042)
    
    * Refactor worker (#10)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * Refactor worker (#2018)
    
    * Refactor worker (#7)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * Refactor worker (#8)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication 
model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify (#2004)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment (#2006)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * add kill command
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * add TaskInstanceCacheManager receive Worker report result,modify master 
polling db transfrom to cache (#2021)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    Co-authored-by: qiaozhanwei <[email protected]>
    
    * refactor heartbeat logic
    
    * update registry and add worker group
    
    * add worker group
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../apache/dolphinscheduler/common/Constants.java  | 10 +++
 .../remote/entity/TaskExecutionContext.java        | 12 +++
 .../master/dispatch/context/ExecutionContext.java  | 11 ++-
 .../dispatch/executor/NettyExecutorManager.java    |  4 +-
 .../dispatch/host/RoundRobinHostManager.java       |  2 +-
 .../server/registry/ZookeeperNodeManager.java      | 90 +++++++++++++++-------
 .../server/registry/ZookeeperRegistryCenter.java   | 28 +++++++
 .../server/worker/WorkerServer.java                | 42 ++--------
 .../cache/TaskExecutionContextCacheManager.java    |  1 -
 .../impl/TaskExecutionContextCacheManagerImpl.java |  2 +
 .../server/worker/config/WorkerConfig.java         |  2 +-
 .../server/worker/registry/WorkerRegistry.java     | 17 ++--
 12 files changed, 141 insertions(+), 80 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 73125f4..2aff56e 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -174,6 +174,11 @@ public final class Constants {
     public static final String COMMA = ",";
 
     /**
+     * slash /
+     */
+    public static final String SLASH = "/";
+
+    /**
      * COLON :
      */
     public static final String COLON = ":";
@@ -994,4 +999,9 @@ public final class Constants {
      * dataSource sensitive param
      */
     public static final String DATASOURCE_PASSWORD_REGEX = 
"(?<=(\"password\":\")).*?(?=(\"))";
+
+    /**
+     * default worker group
+     */
+    public static final String DEFAULT_WORKER_GROUP = "default";
 }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
index 853be75..3ed71e5 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
@@ -160,6 +160,18 @@ public class TaskExecutionContext implements Serializable{
      */
     private int taskTimeout;
 
+    /**
+     * worker group
+     */
+    private String workerGroup;
+
+    public String getWorkerGroup() {
+        return workerGroup;
+    }
+
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
+    }
 
     public Integer getTaskInstanceId() {
         return taskInstanceId;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 14c7d9f..5157dd2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.server.master.dispatch.context;
 
 
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 
@@ -33,23 +34,27 @@ public class ExecutionContext {
     /**
      *  context
      */
-    private final Object context;
+    private final TaskExecutionContext context;
 
     /**
      *  executor type : worker or client
      */
     private final ExecutorType executorType;
 
-    public ExecutionContext(Object context, ExecutorType executorType) {
+    public ExecutionContext(TaskExecutionContext context, ExecutorType 
executorType) {
         this.context = context;
         this.executorType = executorType;
     }
 
+    public String getWorkerGroup(){
+        return context.getWorkerGroup();
+    }
+
     public ExecutorType getExecutorType() {
         return executorType;
     }
 
-    public Object getContext() {
+    public TaskExecutionContext getContext() {
         return context;
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index bdfe71c..f4b1dab 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -136,7 +136,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
         ExecutorType executorType = context.getExecutorType();
         switch (executorType){
             case WORKER:
-                TaskExecutionContext taskExecutionContext = 
(TaskExecutionContext)context.getContext();
+                TaskExecutionContext taskExecutionContext = 
context.getContext();
                 
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
                 break;
             case CLIENT:
@@ -191,7 +191,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
         ExecutorType executorType = context.getExecutorType();
         switch (executorType){
             case WORKER:
-                nodes = zookeeperNodeManager.getWorkerNodes();
+                nodes = 
zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
                 break;
             case CLIENT:
                 break;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index 3bb001e..a573632 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -75,7 +75,7 @@ public class RoundRobinHostManager implements HostManager {
         ExecutorType executorType = context.getExecutorType();
         switch (executorType){
             case WORKER:
-                nodes = zookeeperNodeManager.getWorkerNodes();
+                nodes = 
zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
                 break;
             case CLIENT:
                 break;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 1d6808d..590a25f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -17,9 +17,11 @@
 
 package org.apache.dolphinscheduler.server.registry;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.curator.framework.CuratorFramework;
 
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.service.zk.AbstractListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,9 +32,12 @@ import org.springframework.stereotype.Service;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+
 /**
  *  zookeeper node manager
  */
@@ -47,14 +52,14 @@ public class ZookeeperNodeManager implements 
InitializingBean {
     private final Lock masterLock = new ReentrantLock();
 
     /**
-     *  worker lock
+     *  worker group lock
      */
-    private final Lock workerLock = new ReentrantLock();
+    private final Lock workerGroupLock = new ReentrantLock();
 
     /**
-     *  worker nodes
+     *  worker group nodes
      */
-    private final Set<String> workerNodes = new HashSet<>();
+    private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = 
new ConcurrentHashMap<>();
 
     /**
      *  master nodes
@@ -84,7 +89,7 @@ public class ZookeeperNodeManager implements InitializingBean 
{
         /**
          * init WorkerNodeListener listener
          */
-        registryCenter.getZookeeperCachedOperator().addListener(new 
WorkerNodeListener());
+        registryCenter.getZookeeperCachedOperator().addListener(new 
WorkerGroupNodeListener());
     }
 
     /**
@@ -98,39 +103,55 @@ public class ZookeeperNodeManager implements 
InitializingBean {
         syncMasterNodes(masterNodes);
 
         /**
-         * worker nodes from zookeeper
+         * worker group nodes from zookeeper
          */
-        Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
-        syncWorkerNodes(workersNodes);
+        Set<String> workerGroups = registryCenter.getWorkerGroupDirectly();
+        for(String workerGroup : workerGroups){
+            syncWorkerGroupNodes(workerGroup, 
registryCenter.getWorkerGroupNodesDirectly(workerGroup));
+        }
     }
 
     /**
-     *  worker node listener
+     *  worker group node listener
      */
-    class WorkerNodeListener extends AbstractListener {
+    class WorkerGroupNodeListener extends AbstractListener {
 
         @Override
         protected void dataChanged(CuratorFramework client, TreeCacheEvent 
event, String path) {
             if(registryCenter.isWorkerPath(path)){
                 try {
                     if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
-                        logger.info("worker node : {} added.", path);
+                        logger.info("worker group node : {} added.", path);
+                        String group = parseGroup(path);
+                        Set<String> workerNodes = 
workerGroupNodes.getOrDefault(group, new HashSet<>());
                         Set<String> previousNodes = new HashSet<>(workerNodes);
-                        Set<String> currentNodes = 
registryCenter.getWorkerNodesDirectly();
-                        syncWorkerNodes(currentNodes);
+                        Set<String> currentNodes = 
registryCenter.getWorkerGroupNodesDirectly(group);
+                        logger.info("currentNodes : {}", currentNodes);
+                        syncWorkerGroupNodes(group, currentNodes);
                     } else if (event.getType() == 
TreeCacheEvent.Type.NODE_REMOVED) {
-                        logger.info("worker node : {} down.", path);
+                        logger.info("worker group node : {} down.", path);
+                        String group = parseGroup(path);
+                        Set<String> workerNodes = 
workerGroupNodes.getOrDefault(group, new HashSet<>());
                         Set<String> previousNodes = new HashSet<>(workerNodes);
-                        Set<String> currentNodes = 
registryCenter.getWorkerNodesDirectly();
-                        syncWorkerNodes(currentNodes);
+                        Set<String> currentNodes = 
registryCenter.getWorkerGroupNodesDirectly(group);
+                        syncWorkerGroupNodes(group, currentNodes);
                     }
                 } catch (IllegalArgumentException ignore) {
                     logger.warn(ignore.getMessage());
                 } catch (Exception ex) {
-                    logger.error("WorkerListener capture data change and get 
data failed", ex);
+                    logger.error("WorkerGroupListener capture data change and 
get data failed", ex);
                 }
             }
         }
+
+        private String parseGroup(String path){
+            String[] parts = path.split("\\/");
+            if(parts.length != 6){
+                throw new IllegalArgumentException(String.format("worker group 
path : %s is not valid, ignore", path));
+            }
+            String group = parts[4];
+            return group;
+        }
     }
 
 
@@ -189,29 +210,42 @@ public class ZookeeperNodeManager implements 
InitializingBean {
     }
 
     /**
-     *  sync worker nodes
-     * @param nodes worker nodes
+     * sync worker group nodes
+     * @param workerGroup
+     * @param nodes
      */
-    private void syncWorkerNodes(Set<String> nodes){
-        workerLock.lock();
+    private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes){
+        workerGroupLock.lock();
         try {
+            workerGroup = workerGroup.toLowerCase();
+            Set<String> workerNodes = 
workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
             workerNodes.clear();
             workerNodes.addAll(nodes);
+            workerGroupNodes.put(workerGroup, workerNodes);
         } finally {
-            workerLock.unlock();
+            workerGroupLock.unlock();
         }
     }
 
     /**
-     * get worker nodes
-     * @return worker nodes
+     * get worker group nodes
+     * @param workerGroup
+     * @return
      */
-    public Set<String> getWorkerNodes(){
-        workerLock.lock();
+    public Set<String> getWorkerGroupNodes(String workerGroup){
+        workerGroupLock.lock();
         try {
-            return Collections.unmodifiableSet(workerNodes);
+            if(StringUtils.isEmpty(workerGroup)){
+                workerGroup = DEFAULT_WORKER_GROUP;
+            }
+            workerGroup = workerGroup.toLowerCase();
+            Set<String> nodes = workerGroupNodes.get(workerGroup);
+            if(CollectionUtils.isNotEmpty(nodes)){
+                return Collections.unmodifiableSet(nodes);
+            }
+            return nodes;
         } finally {
-            workerLock.unlock();
+            workerGroupLock.unlock();
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 7d7e2ef..a6a3ea0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -128,6 +128,25 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
     }
 
     /**
+     * get worker group directly
+     * @return
+     */
+    public Set<String> getWorkerGroupDirectly() {
+        List<String> workers = getChildrenKeys(getWorkerPath());
+        return new HashSet<>(workers);
+    }
+
+    /**
+     * get worker group nodes
+     * @param workerGroup
+     * @return
+     */
+    public Set<String> getWorkerGroupNodesDirectly(String workerGroup) {
+        List<String> workers = 
getChildrenKeys(getWorkerGroupPath(workerGroup));
+        return new HashSet<>(workers);
+    }
+
+    /**
      * whether worker path
      * @param path path
      * @return result
@@ -146,6 +165,15 @@ public class ZookeeperRegistryCenter implements 
InitializingBean {
     }
 
     /**
+     * get worker group path
+     * @param workerGroup
+     * @return
+     */
+    public String getWorkerGroupPath(String workerGroup) {
+        return WORKER_PATH + "/" + workerGroup;
+    }
+
+    /**
      * get children nodes
      * @param key key
      * @return children nodes
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 01f66ac..ec43dd8 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -37,13 +36,11 @@ import 
org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 
 import javax.annotation.PostConstruct;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 
@@ -58,31 +55,18 @@ public class WorkerServer implements IStoppable {
      */
     private static final Logger logger = 
LoggerFactory.getLogger(WorkerServer.class);
 
-
     /**
      *  zk worker client
      */
     @Autowired
     private ZKWorkerClient zkWorkerClient = null;
 
-
-    /**
-     *  alert database access
-     */
-    @Autowired
-    private AlertDao alertDao;
-
     /**
      * task queue impl
      */
     protected ITaskQueue taskQueue;
 
     /**
-     * kill executor service
-     */
-    private ExecutorService killExecutorService;
-
-    /**
      *  fetch task executor service
      */
     private ExecutorService fetchTaskExecutorService;
@@ -92,9 +76,6 @@ public class WorkerServer implements IStoppable {
      */
     private CountDownLatch latch;
 
-    @Value("${server.is-combined-server:false}")
-    private Boolean isCombinedServer;
-
     /**
      *  worker config
      */
@@ -157,8 +138,6 @@ public class WorkerServer implements IStoppable {
 
         this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
 
-        this.killExecutorService = 
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
-
         this.fetchTaskExecutorService = 
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
 
         zkWorkerClient.setStoppable(this);
@@ -169,17 +148,15 @@ public class WorkerServer implements IStoppable {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                stop("shutdownhook");
+                stop("shutdownHook");
             }
         }));
 
         //let the main thread await
         latch = new CountDownLatch(1);
-        if (!isCombinedServer) {
-            try {
-                latch.await();
-            } catch (InterruptedException ignore) {
-            }
+        try {
+            latch.await();
+        } catch (InterruptedException ignore) {
         }
     }
 
@@ -210,17 +187,10 @@ public class WorkerServer implements IStoppable {
             try {
                 ThreadPoolExecutors.getInstance().shutdown();
             }catch (Exception e){
-                logger.warn("threadpool service stopped 
exception:{}",e.getMessage());
+                logger.warn("threadPool service stopped 
exception:{}",e.getMessage());
             }
 
-            logger.info("threadpool service stopped");
-
-            try {
-                killExecutorService.shutdownNow();
-            }catch (Exception e){
-                logger.warn("worker kill executor service stopped 
exception:{}",e.getMessage());
-            }
-            logger.info("worker kill executor service stopped");
+            logger.info("threadPool service stopped");
 
             try {
                 fetchTaskExecutorService.shutdownNow();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
index a5615ea..db78127 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.cache;
 
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 
 /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
index b559d58..584c42b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.cache.impl;
 
 import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
+import org.springframework.stereotype.Service;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  *  TaskExecutionContextCache
  */
+@Service
 public class TaskExecutionContextCacheManagerImpl implements 
TaskExecutionContextCacheManager {
 
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 747b34f..3c7500a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -37,7 +37,7 @@ public class WorkerConfig {
     @Value("${worker.reserved.memory}")
     private double workerReservedMemory;
 
-    @Value("${worker.group: DEFAULT}")
+    @Value("${worker.group: default}")
     private String workerGroup;
 
     public String getWorkerGroup() {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 6876f05..977643c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -33,8 +33,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
-import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
 
 
 /**
@@ -44,8 +45,6 @@ public class WorkerRegistry {
 
     private final Logger logger = 
LoggerFactory.getLogger(WorkerRegistry.class);
 
-    private static final String DEFAULT_GROUP = "DEFAULT";
-
     /**
      *  zookeeper registry center
      */
@@ -74,7 +73,7 @@ public class WorkerRegistry {
     /**
      * worker group
      */
-    private final String workerGroup;
+    private String workerGroup;
 
     /**
      *  construct
@@ -82,7 +81,7 @@ public class WorkerRegistry {
      * @param port port
      */
     public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int 
port, long heartBeatInterval){
-        this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP);
+        this(zookeeperRegistryCenter, port, heartBeatInterval, 
DEFAULT_WORKER_GROUP);
     }
 
     /**
@@ -144,9 +143,11 @@ public class WorkerRegistry {
         StringBuilder builder = new StringBuilder(100);
         String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
         builder.append(workerPath).append(SLASH);
-        if(StringUtils.isNotEmpty(workerGroup) && 
!DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){
-            builder.append(workerGroup.trim()).append(SLASH);
+        if(StringUtils.isEmpty(workerGroup)){
+            workerGroup = DEFAULT_WORKER_GROUP;
         }
+        //trim and lower case is need
+        builder.append(workerGroup.trim().toLowerCase()).append(SLASH);
         builder.append(address);
         return builder.toString();
     }

Reply via email to