This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 3bad56c Refactor worker (#2042)
3bad56c is described below
commit 3bad56ca157d5b55e9ff49adc2ad37e6dc3458c9
Author: Tboy <[email protected]>
AuthorDate: Sun Mar 1 11:38:37 2020 +0800
Refactor worker (#2042)
* Refactor worker (#10)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* Refactor worker (#2018)
* Refactor worker (#7)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <[email protected]>
* Refactor worker (#8)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication
model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <[email protected]>
* updates
Co-authored-by: qiaozhanwei <[email protected]>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <[email protected]>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <[email protected]>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify (#2004)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment (#2006)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type (#2012)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
Co-authored-by: qiaozhanwei <[email protected]>
* add kill command
Co-authored-by: qiaozhanwei <[email protected]>
* add TaskInstanceCacheManager receive Worker report result,modify master
polling db transfrom to cache (#2021)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
Co-authored-by: qiaozhanwei <[email protected]>
* refactor heartbeat logic
* update registry and add worker group
* add worker group
Co-authored-by: qiaozhanwei <[email protected]>
---
.../apache/dolphinscheduler/common/Constants.java | 10 +++
.../remote/entity/TaskExecutionContext.java | 12 +++
.../master/dispatch/context/ExecutionContext.java | 11 ++-
.../dispatch/executor/NettyExecutorManager.java | 4 +-
.../dispatch/host/RoundRobinHostManager.java | 2 +-
.../server/registry/ZookeeperNodeManager.java | 90 +++++++++++++++-------
.../server/registry/ZookeeperRegistryCenter.java | 28 +++++++
.../server/worker/WorkerServer.java | 42 ++--------
.../cache/TaskExecutionContextCacheManager.java | 1 -
.../impl/TaskExecutionContextCacheManagerImpl.java | 2 +
.../server/worker/config/WorkerConfig.java | 2 +-
.../server/worker/registry/WorkerRegistry.java | 17 ++--
12 files changed, 141 insertions(+), 80 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 73125f4..2aff56e 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -174,6 +174,11 @@ public final class Constants {
public static final String COMMA = ",";
/**
+ * slash /
+ */
+ public static final String SLASH = "/";
+
+ /**
* COLON :
*/
public static final String COLON = ":";
@@ -994,4 +999,9 @@ public final class Constants {
* dataSource sensitive param
*/
public static final String DATASOURCE_PASSWORD_REGEX =
"(?<=(\"password\":\")).*?(?=(\"))";
+
+ /**
+ * default worker group
+ */
+ public static final String DEFAULT_WORKER_GROUP = "default";
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
index 853be75..3ed71e5 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
@@ -160,6 +160,18 @@ public class TaskExecutionContext implements Serializable{
*/
private int taskTimeout;
+ /**
+ * worker group
+ */
+ private String workerGroup;
+
+ public String getWorkerGroup() {
+ return workerGroup;
+ }
+
+ public void setWorkerGroup(String workerGroup) {
+ this.workerGroup = workerGroup;
+ }
public Integer getTaskInstanceId() {
return taskInstanceId;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 14c7d9f..5157dd2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.context;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -33,23 +34,27 @@ public class ExecutionContext {
/**
* context
*/
- private final Object context;
+ private final TaskExecutionContext context;
/**
* executor type : worker or client
*/
private final ExecutorType executorType;
- public ExecutionContext(Object context, ExecutorType executorType) {
+ public ExecutionContext(TaskExecutionContext context, ExecutorType
executorType) {
this.context = context;
this.executorType = executorType;
}
+ public String getWorkerGroup(){
+ return context.getWorkerGroup();
+ }
+
public ExecutorType getExecutorType() {
return executorType;
}
- public Object getContext() {
+ public TaskExecutionContext getContext() {
return context;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index bdfe71c..f4b1dab 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -136,7 +136,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
- TaskExecutionContext taskExecutionContext =
(TaskExecutionContext)context.getContext();
+ TaskExecutionContext taskExecutionContext =
context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
break;
case CLIENT:
@@ -191,7 +191,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean>{
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
- nodes = zookeeperNodeManager.getWorkerNodes();
+ nodes =
zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index 3bb001e..a573632 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -75,7 +75,7 @@ public class RoundRobinHostManager implements HostManager {
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
- nodes = zookeeperNodeManager.getWorkerNodes();
+ nodes =
zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 1d6808d..590a25f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.server.registry;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,9 +32,12 @@ import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+
/**
* zookeeper node manager
*/
@@ -47,14 +52,14 @@ public class ZookeeperNodeManager implements
InitializingBean {
private final Lock masterLock = new ReentrantLock();
/**
- * worker lock
+ * worker group lock
*/
- private final Lock workerLock = new ReentrantLock();
+ private final Lock workerGroupLock = new ReentrantLock();
/**
- * worker nodes
+ * worker group nodes
*/
- private final Set<String> workerNodes = new HashSet<>();
+ private final ConcurrentHashMap<String, Set<String>> workerGroupNodes =
new ConcurrentHashMap<>();
/**
* master nodes
@@ -84,7 +89,7 @@ public class ZookeeperNodeManager implements InitializingBean
{
/**
* init WorkerNodeListener listener
*/
- registryCenter.getZookeeperCachedOperator().addListener(new
WorkerNodeListener());
+ registryCenter.getZookeeperCachedOperator().addListener(new
WorkerGroupNodeListener());
}
/**
@@ -98,39 +103,55 @@ public class ZookeeperNodeManager implements
InitializingBean {
syncMasterNodes(masterNodes);
/**
- * worker nodes from zookeeper
+ * worker group nodes from zookeeper
*/
- Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
- syncWorkerNodes(workersNodes);
+ Set<String> workerGroups = registryCenter.getWorkerGroupDirectly();
+ for(String workerGroup : workerGroups){
+ syncWorkerGroupNodes(workerGroup,
registryCenter.getWorkerGroupNodesDirectly(workerGroup));
+ }
}
/**
- * worker node listener
+ * worker group node listener
*/
- class WorkerNodeListener extends AbstractListener {
+ class WorkerGroupNodeListener extends AbstractListener {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
if(registryCenter.isWorkerPath(path)){
try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
- logger.info("worker node : {} added.", path);
+ logger.info("worker group node : {} added.", path);
+ String group = parseGroup(path);
+ Set<String> workerNodes =
workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes);
- Set<String> currentNodes =
registryCenter.getWorkerNodesDirectly();
- syncWorkerNodes(currentNodes);
+ Set<String> currentNodes =
registryCenter.getWorkerGroupNodesDirectly(group);
+ logger.info("currentNodes : {}", currentNodes);
+ syncWorkerGroupNodes(group, currentNodes);
} else if (event.getType() ==
TreeCacheEvent.Type.NODE_REMOVED) {
- logger.info("worker node : {} down.", path);
+ logger.info("worker group node : {} down.", path);
+ String group = parseGroup(path);
+ Set<String> workerNodes =
workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes);
- Set<String> currentNodes =
registryCenter.getWorkerNodesDirectly();
- syncWorkerNodes(currentNodes);
+ Set<String> currentNodes =
registryCenter.getWorkerGroupNodesDirectly(group);
+ syncWorkerGroupNodes(group, currentNodes);
}
} catch (IllegalArgumentException ignore) {
logger.warn(ignore.getMessage());
} catch (Exception ex) {
- logger.error("WorkerListener capture data change and get
data failed", ex);
+ logger.error("WorkerGroupListener capture data change and
get data failed", ex);
}
}
}
+
+ private String parseGroup(String path){
+ String[] parts = path.split("\\/");
+ if(parts.length != 6){
+ throw new IllegalArgumentException(String.format("worker group
path : %s is not valid, ignore", path));
+ }
+ String group = parts[4];
+ return group;
+ }
}
@@ -189,29 +210,42 @@ public class ZookeeperNodeManager implements
InitializingBean {
}
/**
- * sync worker nodes
- * @param nodes worker nodes
+ * sync worker group nodes
+ * @param workerGroup
+ * @param nodes
*/
- private void syncWorkerNodes(Set<String> nodes){
- workerLock.lock();
+ private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes){
+ workerGroupLock.lock();
try {
+ workerGroup = workerGroup.toLowerCase();
+ Set<String> workerNodes =
workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
+ workerGroupNodes.put(workerGroup, workerNodes);
} finally {
- workerLock.unlock();
+ workerGroupLock.unlock();
}
}
/**
- * get worker nodes
- * @return worker nodes
+ * get worker group nodes
+ * @param workerGroup
+ * @return
*/
- public Set<String> getWorkerNodes(){
- workerLock.lock();
+ public Set<String> getWorkerGroupNodes(String workerGroup){
+ workerGroupLock.lock();
try {
- return Collections.unmodifiableSet(workerNodes);
+ if(StringUtils.isEmpty(workerGroup)){
+ workerGroup = DEFAULT_WORKER_GROUP;
+ }
+ workerGroup = workerGroup.toLowerCase();
+ Set<String> nodes = workerGroupNodes.get(workerGroup);
+ if(CollectionUtils.isNotEmpty(nodes)){
+ return Collections.unmodifiableSet(nodes);
+ }
+ return nodes;
} finally {
- workerLock.unlock();
+ workerGroupLock.unlock();
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 7d7e2ef..a6a3ea0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -128,6 +128,25 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
}
/**
+ * get worker group directly
+ * @return
+ */
+ public Set<String> getWorkerGroupDirectly() {
+ List<String> workers = getChildrenKeys(getWorkerPath());
+ return new HashSet<>(workers);
+ }
+
+ /**
+ * get worker group nodes
+ * @param workerGroup
+ * @return
+ */
+ public Set<String> getWorkerGroupNodesDirectly(String workerGroup) {
+ List<String> workers =
getChildrenKeys(getWorkerGroupPath(workerGroup));
+ return new HashSet<>(workers);
+ }
+
+ /**
* whether worker path
* @param path path
* @return result
@@ -146,6 +165,15 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
}
/**
+ * get worker group path
+ * @param workerGroup
+ * @return
+ */
+ public String getWorkerGroupPath(String workerGroup) {
+ return WORKER_PATH + "/" + workerGroup;
+ }
+
+ /**
* get children nodes
* @param key key
* @return children nodes
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 01f66ac..ec43dd8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -37,13 +36,11 @@ import
org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -58,31 +55,18 @@ public class WorkerServer implements IStoppable {
*/
private static final Logger logger =
LoggerFactory.getLogger(WorkerServer.class);
-
/**
* zk worker client
*/
@Autowired
private ZKWorkerClient zkWorkerClient = null;
-
- /**
- * alert database access
- */
- @Autowired
- private AlertDao alertDao;
-
/**
* task queue impl
*/
protected ITaskQueue taskQueue;
/**
- * kill executor service
- */
- private ExecutorService killExecutorService;
-
- /**
* fetch task executor service
*/
private ExecutorService fetchTaskExecutorService;
@@ -92,9 +76,6 @@ public class WorkerServer implements IStoppable {
*/
private CountDownLatch latch;
- @Value("${server.is-combined-server:false}")
- private Boolean isCombinedServer;
-
/**
* worker config
*/
@@ -157,8 +138,6 @@ public class WorkerServer implements IStoppable {
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
- this.killExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
-
this.fetchTaskExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
zkWorkerClient.setStoppable(this);
@@ -169,17 +148,15 @@ public class WorkerServer implements IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- stop("shutdownhook");
+ stop("shutdownHook");
}
}));
//let the main thread await
latch = new CountDownLatch(1);
- if (!isCombinedServer) {
- try {
- latch.await();
- } catch (InterruptedException ignore) {
- }
+ try {
+ latch.await();
+ } catch (InterruptedException ignore) {
}
}
@@ -210,17 +187,10 @@ public class WorkerServer implements IStoppable {
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
- logger.warn("threadpool service stopped
exception:{}",e.getMessage());
+ logger.warn("threadPool service stopped
exception:{}",e.getMessage());
}
- logger.info("threadpool service stopped");
-
- try {
- killExecutorService.shutdownNow();
- }catch (Exception e){
- logger.warn("worker kill executor service stopped
exception:{}",e.getMessage());
- }
- logger.info("worker kill executor service stopped");
+ logger.info("threadPool service stopped");
try {
fetchTaskExecutorService.shutdownNow();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
index a5615ea..db78127 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.cache;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
index b559d58..584c42b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.cache.impl;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
+import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* TaskExecutionContextCache
*/
+@Service
public class TaskExecutionContextCacheManagerImpl implements
TaskExecutionContextCacheManager {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 747b34f..3c7500a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -37,7 +37,7 @@ public class WorkerConfig {
@Value("${worker.reserved.memory}")
private double workerReservedMemory;
- @Value("${worker.group: DEFAULT}")
+ @Value("${worker.group: default}")
private String workerGroup;
public String getWorkerGroup() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 6876f05..977643c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -33,8 +33,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
-import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
/**
@@ -44,8 +45,6 @@ public class WorkerRegistry {
private final Logger logger =
LoggerFactory.getLogger(WorkerRegistry.class);
- private static final String DEFAULT_GROUP = "DEFAULT";
-
/**
* zookeeper registry center
*/
@@ -74,7 +73,7 @@ public class WorkerRegistry {
/**
* worker group
*/
- private final String workerGroup;
+ private String workerGroup;
/**
* construct
@@ -82,7 +81,7 @@ public class WorkerRegistry {
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int
port, long heartBeatInterval){
- this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP);
+ this(zookeeperRegistryCenter, port, heartBeatInterval,
DEFAULT_WORKER_GROUP);
}
/**
@@ -144,9 +143,11 @@ public class WorkerRegistry {
StringBuilder builder = new StringBuilder(100);
String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
builder.append(workerPath).append(SLASH);
- if(StringUtils.isNotEmpty(workerGroup) &&
!DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){
- builder.append(workerGroup.trim()).append(SLASH);
+ if(StringUtils.isEmpty(workerGroup)){
+ workerGroup = DEFAULT_WORKER_GROUP;
}
+ //trim and lower case is need
+ builder.append(workerGroup.trim().toLowerCase()).append(SLASH);
builder.append(address);
return builder.toString();
}