This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new d617f9d  refactor worker registry (#2107)
d617f9d is described below

commit d617f9df3281a7097890aa81dbd5d4f95456c243
Author: Tboy <[email protected]>
AuthorDate: Sun Mar 8 12:06:31 2020 +0800

    refactor worker registry (#2107)
---
 .../server/master/MasterServer.java                | 16 +-----
 .../server/master/config/MasterConfig.java         | 11 +++++
 .../master/consumer/TaskUpdateQueueConsumer.java   |  8 +++
 .../server/master/registry/MasterRegistry.java     | 42 ++++++++--------
 .../server/worker/WorkerServer.java                | 27 ++++------
 .../server/worker/config/WorkerConfig.java         | 11 +++++
 .../server/worker/registry/WorkerRegistry.java     | 57 ++++++++--------------
 7 files changed, 81 insertions(+), 91 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 292bfae..9ad3597 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -17,7 +17,6 @@
 package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -26,19 +25,16 @@ import 
org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import 
org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,12 +81,6 @@ public class MasterServer {
     private MasterConfig masterConfig;
 
     /**
-     *  zookeeper registry center
-     */
-    @Autowired
-    private ZookeeperRegistryCenter zookeeperRegistryCenter;
-
-    /**
      *  spring application context
      *  only use it for initialization
      */
@@ -105,6 +95,7 @@ public class MasterServer {
     /**
      * master registry
      */
+    @Autowired
     private MasterRegistry masterRegistry;
 
     /**
@@ -126,7 +117,7 @@ public class MasterServer {
 
         //init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(45678);
+        serverConfig.setListenPort(masterConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
new TaskResponseProcessor());
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new 
TaskAckProcessor());
@@ -134,7 +125,6 @@ public class MasterServer {
         this.nettyRemotingServer.start();
 
         //
-        this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval());
         this.masterRegistry.registry();
 
         //
@@ -166,8 +156,6 @@ public class MasterServer {
             logger.error("start Quartz failed", e);
         }
 
-        TaskUpdateQueueConsumer taskUpdateQueueConsumer = 
SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
-        taskUpdateQueueConsumer.start();
         /**
          *  register hooks, which are called before the process exits
          */
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index e8a8ecb..7e6ae56 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -46,6 +46,17 @@ public class MasterConfig {
     @Value("${master.host.selector:lowerWeight}")
     private String hostSelector;
 
+    @Value("${master.listen.port:45678}")
+    private int listenPort;
+
+    public int getListenPort() {
+        return listenPort;
+    }
+
+    public void setListenPort(int listenPort) {
+        this.listenPort = listenPort;
+    }
+
     public String getHostSelector() {
         return hostSelector;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index cccc700..e3957af 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+
 /**
  * TaskUpdateQueue consumer
  */
@@ -66,6 +68,12 @@ public class TaskUpdateQueueConsumer extends Thread{
     @Autowired
     private ExecutorDispatcher dispatcher;
 
+    @PostConstruct
+    public void init(){
+        super.setName("TaskUpdateQueueConsumerThread");
+        super.start();
+    }
+
     @Override
     public void run() {
         while (Stopper.isRunning()){
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 1eb06b6..0402520 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -23,10 +23,14 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import java.util.Date;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,6 +41,7 @@ import static 
org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
 /**
  *  master registry
  */
+@Service
 public class MasterRegistry {
 
     private final Logger logger = 
LoggerFactory.getLogger(MasterRegistry.class);
@@ -44,38 +49,28 @@ public class MasterRegistry {
     /**
      *  zookeeper registry center
      */
-    private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     /**
-     *  port
+     * master config
      */
-    private final int port;
-
-    /**
-     * heartbeat interval
-     */
-    private final long heartBeatInterval;
+    @Autowired
+    private MasterConfig masterConfig;
 
     /**
      * heartbeat executor
      */
-    private final ScheduledExecutorService heartBeatExecutor;
+    private ScheduledExecutorService heartBeatExecutor;
 
     /**
      * worker start time
      */
-    private final String startTime;
+    private String startTime;
 
-    /**
-     * construct
-     * @param zookeeperRegistryCenter zookeeperRegistryCenter
-     * @param port port
-     * @param heartBeatInterval heartBeatInterval
-     */
-    public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int 
port, long heartBeatInterval){
-        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
-        this.port = port;
-        this.heartBeatInterval = heartBeatInterval;
+
+    @PostConstruct
+    public void init(){
         this.startTime = DateUtils.dateToString(new Date());
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
     }
@@ -100,8 +95,9 @@ public class MasterRegistry {
                 }
             }
         });
-        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), 
heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
-        logger.info("master node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, heartBeatInterval);
+        int masterHeartbeatInterval = 
masterConfig.getMasterHeartbeatInterval();
+        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("master node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, masterHeartbeatInterval);
     }
 
     /**
@@ -129,7 +125,7 @@ public class MasterRegistry {
      * @return
      */
     private String getLocalAddress(){
-        return Constants.LOCAL_ADDRESS + ":" + port;
+        return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort();
     }
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 068d546..441e8db 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -19,11 +19,9 @@ package org.apache.dolphinscheduler.server.worker;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
@@ -37,8 +35,6 @@ import 
org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 
 import javax.annotation.PostConstruct;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 
 /**
  *  worker server
@@ -52,18 +48,6 @@ public class WorkerServer {
     private static final Logger logger = 
LoggerFactory.getLogger(WorkerServer.class);
 
     /**
-     *  worker config
-     */
-    @Autowired
-    private WorkerConfig workerConfig;
-
-    /**
-     *  zookeeper registry center
-     */
-    @Autowired
-    private ZookeeperRegistryCenter zookeeperRegistryCenter;
-
-    /**
      *  netty remote server
      */
     private NettyRemotingServer nettyRemotingServer;
@@ -71,9 +55,16 @@ public class WorkerServer {
     /**
      *  worker registry
      */
+    @Autowired
     private WorkerRegistry workerRegistry;
 
     /**
+     *  worker config
+     */
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    /**
      *  spring application context
      *  only use it for initialization
      */
@@ -87,6 +78,7 @@ public class WorkerServer {
      * @param args arguments
      */
     public static void main(String[] args) {
+        System.setProperty("spring.profiles.active","worker");
         Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
         new 
SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
     }
@@ -101,12 +93,13 @@ public class WorkerServer {
 
         //init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(workerConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, 
new TaskExecuteProcessor());
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new 
TaskKillProcessor());
         this.nettyRemotingServer.start();
 
-        this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, 
serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), 
workerConfig.getWorkerGroup());
+        //
         this.workerRegistry.registry();
 
         /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 3c7500a..f3e701b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,6 +40,17 @@ public class WorkerConfig {
     @Value("${worker.group: default}")
     private String workerGroup;
 
+    @Value("${worker.listen.port: 12345}")
+    private int listenPort;
+
+    public int getListenPort() {
+        return listenPort;
+    }
+
+    public void setListenPort(int listenPort) {
+        this.listenPort = listenPort;
+    }
+
     public String getWorkerGroup() {
         return workerGroup;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index a1d5524..b42386a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -25,9 +25,13 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import java.util.Date;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +45,7 @@ import static 
org.apache.dolphinscheduler.common.Constants.SLASH;
 /**
  *  worker registry
  */
+@Service
 public class WorkerRegistry {
 
     private final Logger logger = 
LoggerFactory.getLogger(WorkerRegistry.class);
@@ -48,54 +53,31 @@ public class WorkerRegistry {
     /**
      *  zookeeper registry center
      */
-    private final ZookeeperRegistryCenter zookeeperRegistryCenter;
+    @Autowired
+    private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     /**
-     *  port
+     *  worker config
      */
-    private final int port;
-
-    /**
-     * heartbeat interval
-     */
-    private final long heartBeatInterval;
+    @Autowired
+    private WorkerConfig workerConfig;
 
     /**
      * heartbeat executor
      */
-    private final ScheduledExecutorService heartBeatExecutor;
+    private ScheduledExecutorService heartBeatExecutor;
 
     /**
      * worker start time
      */
-    private final String startTime;
+    private String startTime;
 
-    /**
-     * worker group
-     */
-    private String workerGroup;
 
-    /**
-     * construct
-     *
-     * @param zookeeperRegistryCenter zookeeperRegistryCenter
-     * @param port port
-     * @param heartBeatInterval heartBeatInterval
-     */
-    public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int 
port, long heartBeatInterval){
-        this(zookeeperRegistryCenter, port, heartBeatInterval, 
DEFAULT_WORKER_GROUP);
-    }
+    private String workerGroup;
 
-    /**
-     *  construct
-     * @param zookeeperRegistryCenter zookeeperRegistryCenter
-     * @param port port
-     */
-    public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int 
port, long heartBeatInterval, String workerGroup){
-        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
-        this.port = port;
-        this.heartBeatInterval = heartBeatInterval;
-        this.workerGroup = workerGroup;
+    @PostConstruct
+    public void init(){
+        this.workerGroup = workerConfig.getWorkerGroup();
         this.startTime = DateUtils.dateToString(new Date());
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
     }
@@ -120,8 +102,9 @@ public class WorkerRegistry {
                 }
             }
         });
-        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), 
heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
-        logger.info("worker node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, heartBeatInterval);
+        int workerHeartbeatInterval = 
workerConfig.getWorkerHeartbeatInterval();
+        this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), 
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("worker node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, workerHeartbeatInterval);
 
     }
 
@@ -159,7 +142,7 @@ public class WorkerRegistry {
      * @return
      */
     private String getLocalAddress(){
-        return Constants.LOCAL_ADDRESS + ":" + port;
+        return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort();
     }
 
     /**

Reply via email to