This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 0febd95 Refactor worker (#2115)
0febd95 is described below
commit 0febd9530c0656c9ecac8af32703b817a59174b3
Author: Tboy <[email protected]>
AuthorDate: Sun Mar 8 15:54:55 2020 +0800
Refactor worker (#2115)
* refactor worker registry
* refactor master server
---
.../server/master/MasterServer.java | 85 +++------------------
...ulerThread.java => MasterSchedulerService.java} | 82 ++++++++------------
.../server/registry/ZookeeperNodeManager.java | 9 +++
.../server/worker/WorkerServer.java | 7 --
.../dolphinscheduler/server/zk/ZKMasterClient.java | 88 ++++------------------
.../service/quartz/ProcessScheduleJob.java | 23 ++----
.../service/zk/AbstractZKClient.java | 69 +----------------
7 files changed, 78 insertions(+), 285 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 9ad3597..212e5d9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -18,9 +18,6 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -29,11 +26,8 @@ import
org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
-import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
@@ -44,7 +38,6 @@ import
org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
-import java.util.concurrent.ExecutorService;
/**
* master server
@@ -58,23 +51,6 @@ public class MasterServer {
private static final Logger logger =
LoggerFactory.getLogger(MasterServer.class);
/**
- * zk master client
- */
- @Autowired
- private ZKMasterClient zkMasterClient = null;
-
- /**
- * process service
- */
- @Autowired
- protected ProcessService processService;
-
- /**
- * master exec thread pool
- */
- private ExecutorService masterSchedulerService;
-
- /**
* master config
*/
@Autowired
@@ -99,6 +75,12 @@ public class MasterServer {
private MasterRegistry masterRegistry;
/**
+ * zk master client
+ */
+ @Autowired
+ private ZKMasterClient zkMasterClient;
+
+ /**
* master server startup
*
* master server not use web service
@@ -125,27 +107,13 @@ public class MasterServer {
this.nettyRemotingServer.start();
//
+ this.zkMasterClient.start();
this.masterRegistry.registry();
- //
- zkMasterClient.init();
-
- masterSchedulerService =
ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
-
- // master scheduler thread
- MasterSchedulerThread masterSchedulerThread = new
MasterSchedulerThread(
- zkMasterClient,
- processService,
- masterConfig.getMasterExecThreads());
-
- // submit master scheduler thread
- masterSchedulerService.execute(masterSchedulerThread);
-
// start QuartzExecutors
// what system should do if exception
try {
logger.info("start Quartz server...");
- ProcessScheduleJob.init(processService);
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
@@ -162,19 +130,15 @@ public class MasterServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- if (zkMasterClient.getActiveMasterNum() <= 1) {
- zkMasterClient.getAlertDao().sendServerStopedAlert(
- 1, OSUtils.getHost(), "Master-Server");
- }
- close("shutdownhook");
+ close("shutdownHook");
}
}));
}
/**
- * gracefully stop
- * @param cause why stopping
+ * gracefully close
+ * @param cause
*/
public void close(String cause) {
@@ -197,40 +161,15 @@ public class MasterServer {
}
this.nettyRemotingServer.close();
this.masterRegistry.unRegistry();
+ this.zkMasterClient.close();
//close quartz
try{
QuartzExecutors.getInstance().shutdown();
+ logger.info("Quartz service stopped");
}catch (Exception e){
logger.warn("Quartz service stopped
exception:{}",e.getMessage());
}
-
- logger.info("Quartz service stopped");
-
- try {
- ThreadPoolExecutors.getInstance().shutdown();
- }catch (Exception e){
- logger.warn("threadPool service stopped
exception:{}",e.getMessage());
- }
-
- logger.info("threadPool service stopped");
-
- try {
- masterSchedulerService.shutdownNow();
- }catch (Exception e){
- logger.warn("master scheduler service stopped
exception:{}",e.getMessage());
- }
-
- logger.info("master scheduler service stopped");
-
- try {
- zkMasterClient.close();
- }catch (Exception e){
- logger.warn("zookeeper service stopped
exception:{}",e.getMessage());
- }
-
- logger.info("zookeeper service stopped");
-
} catch (Exception e) {
logger.error("master server stop exception ", e);
System.exit(-1);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
similarity index 66%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
rename to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 6e96164..a5598ee 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -28,49 +28,43 @@ import
org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
-import java.util.concurrent.ExecutorService;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.util.concurrent.ThreadPoolExecutor;
/**
* master scheduler thread
*/
-public class MasterSchedulerThread implements Runnable {
+@Service
+public class MasterSchedulerService extends Thread {
/**
* logger of MasterSchedulerThread
*/
- private static final Logger logger =
LoggerFactory.getLogger(MasterSchedulerThread.class);
+ private static final Logger logger =
LoggerFactory.getLogger(MasterSchedulerService.class);
/**
* master exec service
*/
- private final ExecutorService masterExecService;
+ private ThreadPoolExecutor masterExecService;
/**
* dolphinscheduler database interface
*/
- private final ProcessService processService;
+ @Autowired
+ private ProcessService processService;
/**
* zookeeper master client
*/
- private final ZKMasterClient zkMasterClient ;
-
- /**
- * master exec thread num
- */
- private int masterExecThreadNum;
-
- /**
- * master config
- */
- private MasterConfig masterConfig;
+ @Autowired
+ private ZKMasterClient zkMasterClient;
/**
* netty remoting client
@@ -78,21 +72,25 @@ public class MasterSchedulerThread implements Runnable {
private NettyRemotingClient nettyRemotingClient;
+ @Autowired
+ private MasterConfig masterConfig;
+
/**
* constructor of MasterSchedulerThread
- * @param zkClient zookeeper master client
- * @param processService process service
- * @param masterExecThreadNum master exec thread num
*/
- public MasterSchedulerThread(ZKMasterClient zkClient, ProcessService
processService, int masterExecThreadNum){
- this.processService = processService;
- this.zkMasterClient = zkClient;
- this.masterExecThreadNum = masterExecThreadNum;
- this.masterExecService =
ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum);
- this.masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
- //
+ @PostConstruct
+ public void init(){
+ this.masterExecService =
(ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",
masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ super.setName("MasterSchedulerThread");
+ super.start();
+ }
+
+ @PreDestroy
+ public void close(){
+ nettyRemotingClient.close();
+ logger.info("master schedule service stopped...");
}
/**
@@ -100,15 +98,10 @@ public class MasterSchedulerThread implements Runnable {
*/
@Override
public void run() {
- logger.info("master scheduler start successfully...");
+ logger.info("master scheduler started");
while (Stopper.isRunning()){
-
- // process instance
- ProcessInstance processInstance = null;
-
InterProcessMutex mutex = null;
try {
-
boolean runCheckFlag =
OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory());
if(!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
@@ -116,21 +109,16 @@ public class MasterSchedulerThread implements Runnable {
}
if (zkMasterClient.getZkClient().getState() ==
CuratorFrameworkState.STARTED) {
- // create distributed lock with the root node path of the
lock space as /dolphinscheduler/lock/masters
- String znodeLock = zkMasterClient.getMasterLockPath();
+ mutex = zkMasterClient.blockAcquireMutex();
- mutex = new
InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
- mutex.acquire();
-
- ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor)
masterExecService;
- int activeCount = poolExecutor.getActiveCount();
+ int activeCount = masterExecService.getActiveCount();
// make sure to scan and delete command table in one
transaction
Command command = processService.findOneCommand();
if (command != null) {
logger.info(String.format("find one command: id: %d,
type: %s", command.getId(),command.getCommandType().toString()));
try{
- processInstance =
processService.handleCommand(logger, OSUtils.getHost(),
this.masterExecThreadNum - activeCount, command);
+ ProcessInstance processInstance =
processService.handleCommand(logger, OSUtils.getHost(),
this.masterConfig.getMasterExecThreads() - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split
DAG ...");
masterExecService.execute(new
MasterExecThread(processInstance, processService, nettyRemotingClient));
@@ -144,15 +132,11 @@ public class MasterSchedulerThread implements Runnable {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
- }catch (Exception e){
+ } catch (Exception e){
logger.error("master scheduler thread exception",e);
- }finally{
- AbstractZKClient.releaseMutex(mutex);
+ } finally{
+ zkMasterClient.releaseMutex(mutex);
}
}
- nettyRemotingClient.close();
- logger.info("master server stopped...");
}
-
-
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 9a4a7ca..a437888 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,12 @@ public class ZookeeperNodeManager implements
InitializingBean {
private ZookeeperRegistryCenter registryCenter;
/**
+ * alert dao
+ */
+ @Autowired
+ private AlertDao alertDao;
+
+ /**
* init listener
* @throws Exception
*/
@@ -136,6 +143,7 @@ public class ZookeeperNodeManager implements
InitializingBean {
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes =
registryCenter.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
+ alertDao.sendServerStopedAlert(1, path, "WORKER");
}
} catch (IllegalArgumentException ignore) {
logger.warn(ignore.getMessage());
@@ -175,6 +183,7 @@ public class ZookeeperNodeManager implements
InitializingBean {
Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes =
registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes);
+ alertDao.sendServerStopedAlert(1, path, "MASTER");
}
} catch (Exception ex) {
logger.error("MasterNodeListener capture data change and
get data failed.", ex);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 441e8db..e1872f7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -78,7 +77,6 @@ public class WorkerServer {
* @param args arguments
*/
public static void main(String[] args) {
- System.setProperty("spring.profiles.active","worker");
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
new
SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
}
@@ -136,11 +134,6 @@ public class WorkerServer {
this.nettyRemotingServer.close();
this.workerRegistry.unRegistry();
- try {
- ThreadPoolExecutors.getInstance().shutdown();
- }catch (Exception e){
- logger.warn("threadPool service stopped
exception:{}",e.getMessage());
- }
} catch (Exception e) {
logger.error("worker server stop exception ", e);
System.exit(-1);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 7fc91dc..a59cf3e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -16,22 +16,19 @@
*/
package org.apache.dolphinscheduler.server.zk;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.utils.ThreadUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
@@ -41,7 +38,6 @@ import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.ThreadFactory;
/**
@@ -58,45 +54,18 @@ public class ZKMasterClient extends AbstractZKClient {
private static final Logger logger =
LoggerFactory.getLogger(ZKMasterClient.class);
/**
- * thread factory
- */
- private static final ThreadFactory defaultThreadFactory =
ThreadUtils.newGenericThreadFactory("Master-Main-Thread");
-
- /**
- * master znode
- */
- private String masterZNode = null;
-
- /**
- * alert database access
- */
- private AlertDao alertDao = null;
- /**
* process service
*/
@Autowired
private ProcessService processService;
- /**
- * default constructor
- */
- private ZKMasterClient(){}
-
- /**
- * init
- */
- public void init(){
-
- logger.info("initialize master client...");
-
- // init dao
- this.initDao();
+ public void start() {
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of
the lock space as /dolphinscheduler/lock/failover/master
String znodeLock = getMasterStartUpLockPath();
- mutex = new InterProcessMutex(zkClient, znodeLock);
+ mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire();
// init system znode
@@ -115,20 +84,9 @@ public class ZKMasterClient extends AbstractZKClient {
}
}
-
- /**
- * init dao
- */
- public void initDao(){
- this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
- }
- /**
- * get alert dao
- *
- * @return AlertDao
- */
- public AlertDao getAlertDao() {
- return alertDao;
+ @Override
+ public void close(){
+ super.close();
}
/**
@@ -167,8 +125,6 @@ public class ZKMasterClient extends AbstractZKClient {
String serverHost = getHostByEventDataPath(path);
// handle dead server
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
- //alert server down.
- alertServerDown(serverHost, zkNodeType);
//failover server
if(failover){
failoverServerWhenDown(serverHost, zkNodeType);
@@ -223,18 +179,6 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
- * send alert when server down
- *
- * @param serverHost server host
- * @param zkNodeType zookeeper node type
- */
- private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {
-
- String serverType = zkNodeType.toString();
- alertDao.sendServerStopedAlert(1, serverHost, serverType);
- }
-
- /**
* monitor master
* @param event event
* @param path path
@@ -271,16 +215,6 @@ public class ZKMasterClient extends AbstractZKClient {
}
}
-
- /**
- * get master znode
- *
- * @return master zookeeper node
- */
- public String getMasterZNode() {
- return masterZNode;
- }
-
/**
* task needs failover if task start before worker starts
*
@@ -399,4 +333,10 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master failover end");
}
+ public InterProcessMutex blockAcquireMutex() throws Exception {
+ InterProcessMutex mutex = new InterProcessMutex(getZkClient(),
getMasterLockPath());
+ mutex.acquire();
+ return mutex;
+ }
+
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 69a80e6..d055e2d 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.quartz.Job;
import org.quartz.JobDataMap;
@@ -44,18 +45,8 @@ public class ProcessScheduleJob implements Job {
*/
private static final Logger logger =
LoggerFactory.getLogger(ProcessScheduleJob.class);
- /**
- * process service
- */
- private static ProcessService processService;
-
-
- /**
- * init
- * @param processService process dao
- */
- public static void init(ProcessService processService) {
- ProcessScheduleJob.processService = processService;
+ public ProcessService getProcessService(){
+ return SpringApplicationContext.getBean(ProcessService.class);
}
/**
@@ -67,7 +58,7 @@ public class ProcessScheduleJob implements Job {
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- Assert.notNull(processService, "please call init() method first");
+ Assert.notNull(getProcessService(), "please call init() method first");
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
@@ -83,7 +74,7 @@ public class ProcessScheduleJob implements Job {
logger.info("scheduled fire time :{}, fire time :{}, process id :{}",
scheduledFireTime, fireTime, scheduleId);
// query schedule
- Schedule schedule = processService.querySchedule(scheduleId);
+ Schedule schedule = getProcessService().querySchedule(scheduleId);
if (schedule == null) {
logger.warn("process schedule does not exist in db,delete schedule
job in quartz, projectId:{}, scheduleId:{}", projectId, scheduleId);
deleteJob(projectId, scheduleId);
@@ -91,7 +82,7 @@ public class ProcessScheduleJob implements Job {
}
- ProcessDefinition processDefinition =
processService.findProcessDefineById(schedule.getProcessDefinitionId());
+ ProcessDefinition processDefinition =
getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState();
if (processDefinition == null || releaseState == ReleaseState.OFFLINE)
{
@@ -111,7 +102,7 @@ public class ProcessScheduleJob implements Job {
command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
- processService.createCommand(command);
+ getProcessService().createCommand(command);
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 24bf259..0b9fbe4 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -16,19 +16,15 @@
*/
package org.apache.dolphinscheduler.service.zk;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import java.util.*;
@@ -37,16 +33,12 @@ import static
org.apache.dolphinscheduler.common.Constants.*;
/**
* abstract zookeeper client
*/
+@Component
public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger =
LoggerFactory.getLogger(AbstractZKClient.class);
/**
- * server stop or not
- */
- protected IStoppable stoppable = null;
-
- /**
* check dead server or not , if dead, stop self
*
* @param zNode node path
@@ -65,8 +57,6 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
if(!isExisted(zNode) || isExisted(deadServerPath)){
return true;
}
-
-
return false;
}
@@ -100,28 +90,6 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
}
/**
- * register server, if server already exists, return null.
- * @param zkNodeType zookeeper node type
- * @return register server path in zookeeper
- * @throws Exception errors
- */
- public String registerServer(ZKNodeType zkNodeType) throws Exception {
- String registerPath = null;
- String host = OSUtils.getHost();
- if(checkZKNodeExists(host, zkNodeType)){
- logger.error("register failure , {} server already
started on host : {}" ,
- zkNodeType.toString(), host);
- return registerPath;
- }
- registerPath = createZNodePath(zkNodeType, host);
-
- // handle dead server
- handleDeadServer(registerPath, zkNodeType,
Constants.DELETE_ZK_OP);
-
- return registerPath;
- }
-
- /**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
@@ -152,16 +120,6 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
}
-
-
- /**
- * for stop server
- * @param serverStoppable server stoppable interface
- */
- public void setStoppable(IStoppable serverStoppable){
- this.stoppable = serverStoppable;
- }
-
/**
* get active master num
* @return active master number
@@ -277,14 +235,6 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
/**
*
- * @return get master lock path
- */
- public String getWorkerLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS;
- }
-
- /**
- *
* @param zkNodeType zookeeper node type
* @return get zookeeper node parent path
*/
@@ -339,7 +289,7 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
* release mutex
* @param mutex mutex
*/
- public static void releaseMutex(InterProcessMutex mutex) {
+ public void releaseMutex(InterProcessMutex mutex) {
if (mutex != null){
try {
mutex.release();
@@ -387,18 +337,6 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
return pathArray[pathArray.length - 1];
}
- /**
- * acquire zk lock
- * @param zkClient zk client
- * @param zNodeLockPath zk lock path
- * @return zk lock
- * @throws Exception errors
- */
- public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String
zNodeLockPath)throws Exception{
- InterProcessMutex mutex = new InterProcessMutex(zkClient,
zNodeLockPath);
- mutex.acquire();
- return mutex;
- }
@Override
public String toString() {
@@ -407,7 +345,6 @@ public abstract class AbstractZKClient extends
ZookeeperCachedOperator {
", deadServerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
", masterZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
", workerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
- ", stoppable=" + stoppable +
'}';
}
}