This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new d617f9d refactor worker registry (#2107)
d617f9d is described below
commit d617f9df3281a7097890aa81dbd5d4f95456c243
Author: Tboy <[email protected]>
AuthorDate: Sun Mar 8 12:06:31 2020 +0800
refactor worker registry (#2107)
---
.../server/master/MasterServer.java | 16 +-----
.../server/master/config/MasterConfig.java | 11 +++++
.../master/consumer/TaskUpdateQueueConsumer.java | 8 +++
.../server/master/registry/MasterRegistry.java | 42 ++++++++--------
.../server/worker/WorkerServer.java | 27 ++++------
.../server/worker/config/WorkerConfig.java | 11 +++++
.../server/worker/registry/WorkerRegistry.java | 57 ++++++++--------------
7 files changed, 81 insertions(+), 91 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 292bfae..9ad3597 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -26,19 +25,16 @@ import
org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import
org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,12 +81,6 @@ public class MasterServer {
private MasterConfig masterConfig;
/**
- * zookeeper registry center
- */
- @Autowired
- private ZookeeperRegistryCenter zookeeperRegistryCenter;
-
- /**
* spring application context
* only use it for initialization
*/
@@ -105,6 +95,7 @@ public class MasterServer {
/**
* master registry
*/
+ @Autowired
private MasterRegistry masterRegistry;
/**
@@ -126,7 +117,7 @@ public class MasterServer {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(45678);
+ serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new
TaskAckProcessor());
@@ -134,7 +125,6 @@ public class MasterServer {
this.nettyRemotingServer.start();
//
- this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval());
this.masterRegistry.registry();
//
@@ -166,8 +156,6 @@ public class MasterServer {
logger.error("start Quartz failed", e);
}
- TaskUpdateQueueConsumer taskUpdateQueueConsumer =
SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
- taskUpdateQueueConsumer.start();
/**
* register hooks, which are called before the process exits
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index e8a8ecb..7e6ae56 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -46,6 +46,17 @@ public class MasterConfig {
@Value("${master.host.selector:lowerWeight}")
private String hostSelector;
+ @Value("${master.listen.port:45678}")
+ private int listenPort;
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
public String getHostSelector() {
return hostSelector;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index cccc700..e3957af 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
+
/**
* TaskUpdateQueue consumer
*/
@@ -66,6 +68,12 @@ public class TaskUpdateQueueConsumer extends Thread{
@Autowired
private ExecutorDispatcher dispatcher;
+ @PostConstruct
+ public void init(){
+ super.setName("TaskUpdateQueueConsumerThread");
+ super.start();
+ }
+
@Override
public void run() {
while (Stopper.isRunning()){
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 1eb06b6..0402520 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -23,10 +23,14 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -37,6 +41,7 @@ import static
org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
/**
* master registry
*/
+@Service
public class MasterRegistry {
private final Logger logger =
LoggerFactory.getLogger(MasterRegistry.class);
@@ -44,38 +49,28 @@ public class MasterRegistry {
/**
* zookeeper registry center
*/
- private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+ @Autowired
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
- * port
+ * master config
*/
- private final int port;
-
- /**
- * heartbeat interval
- */
- private final long heartBeatInterval;
+ @Autowired
+ private MasterConfig masterConfig;
/**
* heartbeat executor
*/
- private final ScheduledExecutorService heartBeatExecutor;
+ private ScheduledExecutorService heartBeatExecutor;
/**
* worker start time
*/
- private final String startTime;
+ private String startTime;
- /**
- * construct
- * @param zookeeperRegistryCenter zookeeperRegistryCenter
- * @param port port
- * @param heartBeatInterval heartBeatInterval
- */
- public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int
port, long heartBeatInterval){
- this.zookeeperRegistryCenter = zookeeperRegistryCenter;
- this.port = port;
- this.heartBeatInterval = heartBeatInterval;
+
+ @PostConstruct
+ public void init(){
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
}
@@ -100,8 +95,9 @@ public class MasterRegistry {
}
}
});
- this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(),
heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
- logger.info("master node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, heartBeatInterval);
+ int masterHeartbeatInterval =
masterConfig.getMasterHeartbeatInterval();
+ this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(),
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+ logger.info("master node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
/**
@@ -129,7 +125,7 @@ public class MasterRegistry {
* @return
*/
private String getLocalAddress(){
- return Constants.LOCAL_ADDRESS + ":" + port;
+ return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort();
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 068d546..441e8db 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -19,11 +19,9 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
@@ -37,8 +35,6 @@ import
org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
/**
* worker server
@@ -52,18 +48,6 @@ public class WorkerServer {
private static final Logger logger =
LoggerFactory.getLogger(WorkerServer.class);
/**
- * worker config
- */
- @Autowired
- private WorkerConfig workerConfig;
-
- /**
- * zookeeper registry center
- */
- @Autowired
- private ZookeeperRegistryCenter zookeeperRegistryCenter;
-
- /**
* netty remote server
*/
private NettyRemotingServer nettyRemotingServer;
@@ -71,9 +55,16 @@ public class WorkerServer {
/**
* worker registry
*/
+ @Autowired
private WorkerRegistry workerRegistry;
/**
+ * worker config
+ */
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ /**
* spring application context
* only use it for initialization
*/
@@ -87,6 +78,7 @@ public class WorkerServer {
* @param args arguments
*/
public static void main(String[] args) {
+ System.setProperty("spring.profiles.active","worker");
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
new
SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
}
@@ -101,12 +93,13 @@ public class WorkerServer {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST,
new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new
TaskKillProcessor());
this.nettyRemotingServer.start();
- this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter,
serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(),
workerConfig.getWorkerGroup());
+ //
this.workerRegistry.registry();
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 3c7500a..f3e701b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,6 +40,17 @@ public class WorkerConfig {
@Value("${worker.group: default}")
private String workerGroup;
+ @Value("${worker.listen.port: 12345}")
+ private int listenPort;
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
public String getWorkerGroup() {
return workerGroup;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index a1d5524..b42386a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -25,9 +25,13 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +45,7 @@ import static
org.apache.dolphinscheduler.common.Constants.SLASH;
/**
* worker registry
*/
+@Service
public class WorkerRegistry {
private final Logger logger =
LoggerFactory.getLogger(WorkerRegistry.class);
@@ -48,54 +53,31 @@ public class WorkerRegistry {
/**
* zookeeper registry center
*/
- private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+ @Autowired
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
- * port
+ * worker config
*/
- private final int port;
-
- /**
- * heartbeat interval
- */
- private final long heartBeatInterval;
+ @Autowired
+ private WorkerConfig workerConfig;
/**
* heartbeat executor
*/
- private final ScheduledExecutorService heartBeatExecutor;
+ private ScheduledExecutorService heartBeatExecutor;
/**
* worker start time
*/
- private final String startTime;
+ private String startTime;
- /**
- * worker group
- */
- private String workerGroup;
- /**
- * construct
- *
- * @param zookeeperRegistryCenter zookeeperRegistryCenter
- * @param port port
- * @param heartBeatInterval heartBeatInterval
- */
- public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int
port, long heartBeatInterval){
- this(zookeeperRegistryCenter, port, heartBeatInterval,
DEFAULT_WORKER_GROUP);
- }
+ private String workerGroup;
- /**
- * construct
- * @param zookeeperRegistryCenter zookeeperRegistryCenter
- * @param port port
- */
- public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int
port, long heartBeatInterval, String workerGroup){
- this.zookeeperRegistryCenter = zookeeperRegistryCenter;
- this.port = port;
- this.heartBeatInterval = heartBeatInterval;
- this.workerGroup = workerGroup;
+ @PostConstruct
+ public void init(){
+ this.workerGroup = workerConfig.getWorkerGroup();
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
}
@@ -120,8 +102,9 @@ public class WorkerRegistry {
}
}
});
- this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(),
heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
- logger.info("worker node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, heartBeatInterval);
+ int workerHeartbeatInterval =
workerConfig.getWorkerHeartbeatInterval();
+ this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(),
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
+ logger.info("worker node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, workerHeartbeatInterval);
}
@@ -159,7 +142,7 @@ public class WorkerRegistry {
* @return
*/
private String getLocalAddress(){
- return Constants.LOCAL_ADDRESS + ":" + port;
+ return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort();
}
/**