This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 0febd95  Refactor worker (#2115)
0febd95 is described below

commit 0febd9530c0656c9ecac8af32703b817a59174b3
Author: Tboy <[email protected]>
AuthorDate: Sun Mar 8 15:54:55 2020 +0800

    Refactor worker (#2115)
    
    * refactor worker registry
    
    * refactor master server
---
 .../server/master/MasterServer.java                | 85 +++------------------
 ...ulerThread.java => MasterSchedulerService.java} | 82 ++++++++------------
 .../server/registry/ZookeeperNodeManager.java      |  9 +++
 .../server/worker/WorkerServer.java                |  7 --
 .../dolphinscheduler/server/zk/ZKMasterClient.java | 88 ++++------------------
 .../service/quartz/ProcessScheduleJob.java         | 23 ++----
 .../service/zk/AbstractZKClient.java               | 69 +----------------
 7 files changed, 78 insertions(+), 285 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 9ad3597..212e5d9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -18,9 +18,6 @@ package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -29,11 +26,8 @@ import 
org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
-import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
@@ -44,7 +38,6 @@ import 
org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 
 import javax.annotation.PostConstruct;
-import java.util.concurrent.ExecutorService;
 
 /**
  * master server
@@ -58,23 +51,6 @@ public class MasterServer {
     private static final Logger logger = 
LoggerFactory.getLogger(MasterServer.class);
 
     /**
-     *  zk master client
-     */
-    @Autowired
-    private ZKMasterClient zkMasterClient = null;
-
-    /**
-     *  process service
-     */
-    @Autowired
-    protected ProcessService processService;
-
-    /**
-     *  master exec thread pool
-     */
-    private ExecutorService masterSchedulerService;
-
-    /**
      * master config
      */
     @Autowired
@@ -99,6 +75,12 @@ public class MasterServer {
     private MasterRegistry masterRegistry;
 
     /**
+     * zk master client
+     */
+    @Autowired
+    private ZKMasterClient zkMasterClient;
+
+    /**
      * master server startup
      *
      * master server not use web service
@@ -125,27 +107,13 @@ public class MasterServer {
         this.nettyRemotingServer.start();
 
         //
+        this.zkMasterClient.start();
         this.masterRegistry.registry();
 
-        //
-        zkMasterClient.init();
-
-        masterSchedulerService = 
ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
-
-        // master scheduler thread
-        MasterSchedulerThread masterSchedulerThread = new 
MasterSchedulerThread(
-                zkMasterClient,
-                processService,
-                masterConfig.getMasterExecThreads());
-
-        // submit master scheduler thread
-        masterSchedulerService.execute(masterSchedulerThread);
-
         // start QuartzExecutors
         // what system should do if exception
         try {
             logger.info("start Quartz server...");
-            ProcessScheduleJob.init(processService);
             QuartzExecutors.getInstance().start();
         } catch (Exception e) {
             try {
@@ -162,19 +130,15 @@ public class MasterServer {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                if (zkMasterClient.getActiveMasterNum() <= 1) {
-                    zkMasterClient.getAlertDao().sendServerStopedAlert(
-                        1, OSUtils.getHost(), "Master-Server");
-                }
-                close("shutdownhook");
+                close("shutdownHook");
             }
         }));
     }
 
 
     /**
-     * gracefully stop
-     * @param cause why stopping
+     * gracefully close
+     * @param cause
      */
     public void close(String cause) {
 
@@ -197,40 +161,15 @@ public class MasterServer {
             }
             this.nettyRemotingServer.close();
             this.masterRegistry.unRegistry();
+            this.zkMasterClient.close();
 
             //close quartz
             try{
                 QuartzExecutors.getInstance().shutdown();
+                logger.info("Quartz service stopped");
             }catch (Exception e){
                 logger.warn("Quartz service stopped 
exception:{}",e.getMessage());
             }
-
-            logger.info("Quartz service stopped");
-
-            try {
-                ThreadPoolExecutors.getInstance().shutdown();
-            }catch (Exception e){
-                logger.warn("threadPool service stopped 
exception:{}",e.getMessage());
-            }
-
-            logger.info("threadPool service stopped");
-
-            try {
-                masterSchedulerService.shutdownNow();
-            }catch (Exception e){
-                logger.warn("master scheduler service stopped 
exception:{}",e.getMessage());
-            }
-
-            logger.info("master scheduler service stopped");
-
-            try {
-                zkMasterClient.close();
-            }catch (Exception e){
-                logger.warn("zookeeper service stopped 
exception:{}",e.getMessage());
-            }
-
-            logger.info("zookeeper service stopped");
-
         } catch (Exception e) {
             logger.error("master server stop exception ", e);
             System.exit(-1);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
similarity index 66%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
rename to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 6e96164..a5598ee 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -28,49 +28,43 @@ import 
org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
-import java.util.concurrent.ExecutorService;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  *  master scheduler thread
  */
-public class MasterSchedulerThread implements Runnable {
+@Service
+public class MasterSchedulerService extends Thread {
 
     /**
      * logger of MasterSchedulerThread
      */
-    private static final Logger logger = 
LoggerFactory.getLogger(MasterSchedulerThread.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(MasterSchedulerService.class);
 
     /**
      * master exec service
      */
-    private final ExecutorService masterExecService;
+    private ThreadPoolExecutor masterExecService;
 
     /**
      * dolphinscheduler database interface
      */
-    private final ProcessService processService;
+    @Autowired
+    private ProcessService processService;
 
     /**
      * zookeeper master client
      */
-    private final ZKMasterClient zkMasterClient ;
-
-    /**
-     * master exec thread num
-     */
-    private int masterExecThreadNum;
-
-    /**
-     * master config
-     */
-    private MasterConfig masterConfig;
+    @Autowired
+    private ZKMasterClient zkMasterClient;
 
     /**
      *  netty remoting client
@@ -78,21 +72,25 @@ public class MasterSchedulerThread implements Runnable {
     private NettyRemotingClient nettyRemotingClient;
 
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     /**
      * constructor of MasterSchedulerThread
-     * @param zkClient              zookeeper master client
-     * @param processService            process service
-     * @param masterExecThreadNum   master exec thread num
      */
-    public MasterSchedulerThread(ZKMasterClient zkClient, ProcessService 
processService, int masterExecThreadNum){
-        this.processService = processService;
-        this.zkMasterClient = zkClient;
-        this.masterExecThreadNum = masterExecThreadNum;
-        this.masterExecService = 
ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum);
-        this.masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
-        //
+    @PostConstruct
+    public void init(){
+        this.masterExecService = 
(ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",
 masterConfig.getMasterExecThreads());
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        super.setName("MasterSchedulerThread");
+        super.start();
+    }
+
+    @PreDestroy
+    public void close(){
+        nettyRemotingClient.close();
+        logger.info("master schedule service stopped...");
     }
 
     /**
@@ -100,15 +98,10 @@ public class MasterSchedulerThread implements Runnable {
      */
     @Override
     public void run() {
-        logger.info("master scheduler start successfully...");
+        logger.info("master scheduler started");
         while (Stopper.isRunning()){
-
-            // process instance
-            ProcessInstance processInstance = null;
-
             InterProcessMutex mutex = null;
             try {
-
                 boolean runCheckFlag = 
OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), 
masterConfig.getMasterReservedMemory());
                 if(!runCheckFlag) {
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
@@ -116,21 +109,16 @@ public class MasterSchedulerThread implements Runnable {
                 }
                 if (zkMasterClient.getZkClient().getState() == 
CuratorFrameworkState.STARTED) {
 
-                    // create distributed lock with the root node path of the 
lock space as /dolphinscheduler/lock/masters
-                    String znodeLock = zkMasterClient.getMasterLockPath();
+                    mutex = zkMasterClient.blockAcquireMutex();
 
-                    mutex = new 
InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
-                    mutex.acquire();
-
-                    ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) 
masterExecService;
-                    int activeCount = poolExecutor.getActiveCount();
+                    int activeCount = masterExecService.getActiveCount();
                     // make sure to scan and delete command  table in one 
transaction
                     Command command = processService.findOneCommand();
                     if (command != null) {
                         logger.info(String.format("find one command: id: %d, 
type: %s", command.getId(),command.getCommandType().toString()));
 
                         try{
-                            processInstance = 
processService.handleCommand(logger, OSUtils.getHost(), 
this.masterExecThreadNum - activeCount, command);
+                            ProcessInstance processInstance = 
processService.handleCommand(logger, OSUtils.getHost(), 
this.masterConfig.getMasterExecThreads() - activeCount, command);
                             if (processInstance != null) {
                                 logger.info("start master exec thread , split 
DAG ...");
                                 masterExecService.execute(new 
MasterExecThread(processInstance, processService, nettyRemotingClient));
@@ -144,15 +132,11 @@ public class MasterSchedulerThread implements Runnable {
                         Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     }
                 }
-            }catch (Exception e){
+            } catch (Exception e){
                 logger.error("master scheduler thread exception",e);
-            }finally{
-                AbstractZKClient.releaseMutex(mutex);
+            } finally{
+                zkMasterClient.releaseMutex(mutex);
             }
         }
-        nettyRemotingClient.close();
-        logger.info("master server stopped...");
     }
-
-
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 9a4a7ca..a437888 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.service.zk.AbstractListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +75,12 @@ public class ZookeeperNodeManager implements 
InitializingBean {
     private ZookeeperRegistryCenter registryCenter;
 
     /**
+     * alert dao
+     */
+    @Autowired
+    private AlertDao alertDao;
+
+    /**
      * init listener
      * @throws Exception
      */
@@ -136,6 +143,7 @@ public class ZookeeperNodeManager implements 
InitializingBean {
                         Set<String> previousNodes = new HashSet<>(workerNodes);
                         Set<String> currentNodes = 
registryCenter.getWorkerGroupNodesDirectly(group);
                         syncWorkerGroupNodes(group, currentNodes);
+                        alertDao.sendServerStopedAlert(1, path, "WORKER");
                     }
                 } catch (IllegalArgumentException ignore) {
                     logger.warn(ignore.getMessage());
@@ -175,6 +183,7 @@ public class ZookeeperNodeManager implements 
InitializingBean {
                         Set<String> previousNodes = new HashSet<>(masterNodes);
                         Set<String> currentNodes = 
registryCenter.getMasterNodesDirectly();
                         syncMasterNodes(currentNodes);
+                        alertDao.sendServerStopedAlert(1, path, "MASTER");
                     }
                 } catch (Exception ex) {
                     logger.error("MasterNodeListener capture data change and 
get data failed.", ex);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 441e8db..e1872f7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -78,7 +77,6 @@ public class WorkerServer {
      * @param args arguments
      */
     public static void main(String[] args) {
-        System.setProperty("spring.profiles.active","worker");
         Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
         new 
SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
     }
@@ -136,11 +134,6 @@ public class WorkerServer {
             this.nettyRemotingServer.close();
             this.workerRegistry.unRegistry();
 
-            try {
-                ThreadPoolExecutors.getInstance().shutdown();
-            }catch (Exception e){
-                logger.warn("threadPool service stopped 
exception:{}",e.getMessage());
-            }
         } catch (Exception e) {
             logger.error("worker server stop exception ", e);
             System.exit(-1);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 7fc91dc..a59cf3e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -16,22 +16,19 @@
  */
 package org.apache.dolphinscheduler.server.zk;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
 import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.DaoFactory;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.utils.ThreadUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
 import org.slf4j.Logger;
@@ -41,7 +38,6 @@ import org.springframework.stereotype.Component;
 
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.ThreadFactory;
 
 
 /**
@@ -58,45 +54,18 @@ public class ZKMasterClient extends AbstractZKClient {
        private static final Logger logger = 
LoggerFactory.getLogger(ZKMasterClient.class);
 
        /**
-        * thread factory
-        */
-       private static final ThreadFactory defaultThreadFactory = 
ThreadUtils.newGenericThreadFactory("Master-Main-Thread");
-
-       /**
-        *  master znode
-        */
-       private String masterZNode = null;
-
-       /**
-        *  alert database access
-        */
-       private AlertDao alertDao = null;
-       /**
         *  process service
         */
        @Autowired
        private ProcessService processService;
 
-       /**
-        * default constructor
-        */
-       private ZKMasterClient(){}
-
-       /**
-        * init
-        */
-       public void init(){
-
-               logger.info("initialize master client...");
-
-               // init dao
-               this.initDao();
+    public void start() {
 
                InterProcessMutex mutex = null;
                try {
                        // create distributed lock with the root node path of 
the lock space as /dolphinscheduler/lock/failover/master
                        String znodeLock = getMasterStartUpLockPath();
-                       mutex = new InterProcessMutex(zkClient, znodeLock);
+                       mutex = new InterProcessMutex(getZkClient(), znodeLock);
                        mutex.acquire();
 
                        // init system znode
@@ -115,20 +84,9 @@ public class ZKMasterClient extends AbstractZKClient {
                }
        }
 
-
-       /**
-        *  init dao
-        */
-       public void initDao(){
-               this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
-       }
-       /**
-        * get alert dao
-        *
-        * @return AlertDao
-        */
-       public AlertDao getAlertDao() {
-               return alertDao;
+       @Override
+       public void close(){
+               super.close();
        }
 
        /**
@@ -167,8 +125,6 @@ public class ZKMasterClient extends AbstractZKClient {
                        String serverHost = getHostByEventDataPath(path);
                        // handle dead server
                        handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
-                       //alert server down.
-                       alertServerDown(serverHost, zkNodeType);
                        //failover server
                        if(failover){
                                failoverServerWhenDown(serverHost, zkNodeType);
@@ -223,18 +179,6 @@ public class ZKMasterClient extends AbstractZKClient {
        }
 
        /**
-        * send alert when server down
-        *
-        * @param serverHost    server host
-        * @param zkNodeType    zookeeper node type
-        */
-       private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {
-
-               String serverType = zkNodeType.toString();
-               alertDao.sendServerStopedAlert(1, serverHost, serverType);
-       }
-
-       /**
         * monitor master
         * @param event event
         * @param path path
@@ -271,16 +215,6 @@ public class ZKMasterClient extends AbstractZKClient {
                }
        }
 
-
-       /**
-        * get master znode
-        *
-        * @return master zookeeper node
-        */
-       public String getMasterZNode() {
-               return masterZNode;
-       }
-
        /**
         * task needs failover if task start before worker starts
      *
@@ -399,4 +333,10 @@ public class ZKMasterClient extends AbstractZKClient {
                logger.info("master failover end");
        }
 
+       public InterProcessMutex blockAcquireMutex() throws Exception {
+        InterProcessMutex mutex = new InterProcessMutex(getZkClient(), 
getMasterLockPath());
+        mutex.acquire();
+        return mutex;
+       }
+
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 69a80e6..d055e2d 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
@@ -44,18 +45,8 @@ public class ProcessScheduleJob implements Job {
      */
     private static final Logger logger = 
LoggerFactory.getLogger(ProcessScheduleJob.class);
 
-    /**
-     * process service
-     */
-    private static ProcessService processService;
-
-
-    /**
-     * init
-     * @param processService process dao
-     */
-    public static void init(ProcessService processService) {
-        ProcessScheduleJob.processService = processService;
+    public ProcessService getProcessService(){
+        return SpringApplicationContext.getBean(ProcessService.class);
     }
 
     /**
@@ -67,7 +58,7 @@ public class ProcessScheduleJob implements Job {
     @Override
     public void execute(JobExecutionContext context) throws 
JobExecutionException {
 
-        Assert.notNull(processService, "please call init() method first");
+        Assert.notNull(getProcessService(), "please call init() method first");
 
         JobDataMap dataMap = context.getJobDetail().getJobDataMap();
 
@@ -83,7 +74,7 @@ public class ProcessScheduleJob implements Job {
         logger.info("scheduled fire time :{}, fire time :{}, process id :{}", 
scheduledFireTime, fireTime, scheduleId);
 
         // query schedule
-        Schedule schedule = processService.querySchedule(scheduleId);
+        Schedule schedule = getProcessService().querySchedule(scheduleId);
         if (schedule == null) {
             logger.warn("process schedule does not exist in db,delete schedule 
job in quartz, projectId:{}, scheduleId:{}", projectId, scheduleId);
             deleteJob(projectId, scheduleId);
@@ -91,7 +82,7 @@ public class ProcessScheduleJob implements Job {
         }
 
 
-        ProcessDefinition processDefinition = 
processService.findProcessDefineById(schedule.getProcessDefinitionId());
+        ProcessDefinition processDefinition = 
getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
         // release state : online/offline
         ReleaseState releaseState = processDefinition.getReleaseState();
         if (processDefinition == null || releaseState == ReleaseState.OFFLINE) 
{
@@ -111,7 +102,7 @@ public class ProcessScheduleJob implements Job {
         command.setWarningType(schedule.getWarningType());
         
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
 
-        processService.createCommand(command);
+        getProcessService().createCommand(command);
     }
 
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 24bf259..0b9fbe4 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -16,19 +16,15 @@
  */
 package org.apache.dolphinscheduler.service.zk;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
 import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ResInfo;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 import java.util.*;
 
@@ -37,16 +33,12 @@ import static 
org.apache.dolphinscheduler.common.Constants.*;
 /**
  * abstract zookeeper client
  */
+@Component
 public abstract class AbstractZKClient extends ZookeeperCachedOperator {
 
        private static final Logger logger = 
LoggerFactory.getLogger(AbstractZKClient.class);
 
        /**
-        * server stop or not
-        */
-       protected IStoppable stoppable = null;
-
-       /**
         *      check dead server or not , if dead, stop self
         *
         * @param zNode          node path
@@ -65,8 +57,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
                if(!isExisted(zNode) || isExisted(deadServerPath)){
                        return true;
                }
-
-
                return false;
        }
 
@@ -100,28 +90,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
        }
 
        /**
-        * register server,  if server already exists, return null.
-        * @param zkNodeType zookeeper node type
-        * @return register server path in zookeeper
-        * @throws Exception errors
-        */
-       public String registerServer(ZKNodeType zkNodeType) throws Exception {
-               String registerPath = null;
-               String host = OSUtils.getHost();
-               if(checkZKNodeExists(host, zkNodeType)){
-                       logger.error("register failure , {} server already 
started on host : {}" ,
-                                       zkNodeType.toString(), host);
-                       return registerPath;
-               }
-               registerPath = createZNodePath(zkNodeType, host);
-
-    // handle dead server
-               handleDeadServer(registerPath, zkNodeType, 
Constants.DELETE_ZK_OP);
-
-               return registerPath;
-       }
-
-       /**
         * opType(add): if find dead server , then add to zk deadServerPath
         * opType(delete): delete path from zk
         *
@@ -152,16 +120,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
 
        }
 
-
-
-       /**
-        * for stop server
-        * @param serverStoppable server stoppable interface
-        */
-       public void setStoppable(IStoppable serverStoppable){
-               this.stoppable = serverStoppable;
-       }
-
        /**
         * get active master num
         * @return active master number
@@ -277,14 +235,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
 
        /**
         *
-        * @return get master lock path
-        */
-       public String getWorkerLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS;
-       }
-
-       /**
-        *
         * @param zkNodeType zookeeper node type
         * @return get zookeeper node parent path
         */
@@ -339,7 +289,7 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
         * release mutex
         * @param mutex mutex
         */
-       public static void releaseMutex(InterProcessMutex mutex) {
+       public void releaseMutex(InterProcessMutex mutex) {
                if (mutex != null){
                        try {
                                mutex.release();
@@ -387,18 +337,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
                return pathArray[pathArray.length - 1];
 
        }
-       /**
-        * acquire zk lock
-        * @param zkClient zk client
-        * @param zNodeLockPath zk lock path
-        * @return zk lock
-        * @throws Exception errors
-        */
-       public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String 
zNodeLockPath)throws Exception{
-               InterProcessMutex mutex = new InterProcessMutex(zkClient, 
zNodeLockPath);
-               mutex.acquire();
-               return mutex;
-       }
 
        @Override
        public String toString() {
@@ -407,7 +345,6 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
                                ", deadServerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
                                ", masterZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
                                ", workerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
-                               ", stoppable=" + stoppable +
                                '}';
        }
 }

Reply via email to