This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
new 077f4d1 [1.3.6-prepare][Fix-#4840][worker] fix master fault tolerance
when startup #4845 (#4916)
077f4d1 is described below
commit 077f4d120902288fd3a13f6e7cb3603d16b6f4c3
Author: Kirs <[email protected]>
AuthorDate: Tue Mar 9 15:58:58 2021 +0800
[1.3.6-prepare][Fix-#4840][worker] fix master fault tolerance when startup
#4845 (#4916)
* [1.3.6-prepare][Fix-#4840][worker] fix master fault tolerance when
startup #4845
* code style
---
.../server/master/MasterServer.java | 20 +-
.../dolphinscheduler/server/zk/ZKMasterClient.java | 628 +++++++++++----------
2 files changed, 328 insertions(+), 320 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index d863742..18882a2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -25,12 +25,14 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
+
+import javax.annotation.PostConstruct;
+
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,8 +42,6 @@ import
org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
-import javax.annotation.PostConstruct;
-
@@ -74,12 +74,6 @@ public class MasterServer {
private NettyRemotingServer nettyRemotingServer;
/**
- * master registry
- */
- @Autowired
- private MasterRegistry masterRegistry;
-
- /**
* zk master client
*/
@Autowired
@@ -117,14 +111,11 @@ public class MasterServer {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new
TaskKillResponseProcessor());
this.nettyRemotingServer.start();
- // register
- this.masterRegistry.registry();
-
// self tolerant
this.zkMasterClient.start();
- //
- masterSchedulerService.start();
+ // scheduler start
+ this.masterSchedulerService.start();
// start QuartzExecutors
// what system should do if exception
@@ -178,7 +169,6 @@ public class MasterServer {
//
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
- this.masterRegistry.unRegistry();
this.zkMasterClient.close();
//close quartz
try{
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index dbba51b..7f74c8c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -30,9 +31,11 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -45,314 +48,329 @@ import static
org.apache.dolphinscheduler.common.Constants.*;
/**
- * zookeeper master client
- *
- * single instance
+ * zookeeper master client
+ * <p>
+ * single instance
*/
@Component
public class ZKMasterClient extends AbstractZKClient {
- /**
- * logger
- */
- private static final Logger logger =
LoggerFactory.getLogger(ZKMasterClient.class);
-
- /**
- * process service
- */
- @Autowired
- private ProcessService processService;
-
- public void start() {
-
- InterProcessMutex mutex = null;
- try {
- // create distributed lock with the root node path of
the lock space as /dolphinscheduler/lock/failover/master
- String znodeLock = getMasterStartUpLockPath();
- mutex = new InterProcessMutex(getZkClient(), znodeLock);
- mutex.acquire();
-
- // init system znode
- this.initSystemZNode();
-
- while (!checkZKNodeExists(OSUtils.getHost(),
ZKNodeType.MASTER)){
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
- }
- // startup tolerant
- if (getActiveMasterNum() == 1) {
- removeZKNodePath(null, ZKNodeType.MASTER, true);
- removeZKNodePath(null, ZKNodeType.WORKER, true);
- }
- registerListener();
- }catch (Exception e){
- logger.error("master start up exception",e);
- }finally {
- releaseMutex(mutex);
- }
- }
-
- @Override
- public void close(){
- super.close();
- }
-
- /**
- * handle path events that this class cares about
- * @param client zkClient
- * @param event path event
- * @param path zk path
- */
- @Override
- protected void dataChanged(CuratorFramework client, TreeCacheEvent
event, String path) {
- //monitor master
-
if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){
- handleMasterEvent(event,path);
- }else
if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
- //monitor worker
- handleWorkerEvent(event,path);
- }
- }
-
- /**
- * remove zookeeper node path
- *
- * @param path zookeeper node path
- * @param zkNodeType zookeeper node type
- * @param failover is failover
- */
- private void removeZKNodePath(String path, ZKNodeType zkNodeType,
boolean failover) {
- logger.info("{} node deleted : {}", zkNodeType.toString(),
path);
- InterProcessMutex mutex = null;
- try {
- String failoverPath = getFailoverLockPath(zkNodeType);
- // create a distributed lock
- mutex = new InterProcessMutex(getZkClient(),
failoverPath);
- mutex.acquire();
-
- String serverHost = null;
- if(StringUtils.isNotEmpty(path)){
- serverHost = getHostByEventDataPath(path);
- if(StringUtils.isEmpty(serverHost)){
- logger.error("server down error:
unknown path: {}", path);
- return;
- }
- // handle dead server
- handleDeadServer(path, zkNodeType,
Constants.ADD_ZK_OP);
- }
- //failover server
- if(failover){
- failoverServerWhenDown(serverHost, zkNodeType);
- }
- }catch (Exception e){
- logger.error("{} server failover failed.",
zkNodeType.toString());
- logger.error("failover exception ",e);
- }
- finally {
- releaseMutex(mutex);
- }
- }
-
- /**
- * failover server when server down
- *
- * @param serverHost server host
- * @param zkNodeType zookeeper node type
- * @throws Exception exception
- */
- private void failoverServerWhenDown(String serverHost, ZKNodeType
zkNodeType) throws Exception {
- switch (zkNodeType) {
- case MASTER:
- failoverMaster(serverHost);
- break;
- case WORKER:
- failoverWorker(serverHost, true);
- default:
- break;
- }
- }
-
- /**
- * get failover lock path
- *
- * @param zkNodeType zookeeper node type
- * @return fail over lock path
- */
- private String getFailoverLockPath(ZKNodeType zkNodeType){
-
- switch (zkNodeType){
- case MASTER:
- return getMasterFailoverLockPath();
- case WORKER:
- return getWorkerFailoverLockPath();
- default:
- return "";
- }
- }
-
- /**
- * monitor master
- * @param event event
- * @param path path
- */
- public void handleMasterEvent(TreeCacheEvent event, String path){
- switch (event.getType()) {
- case NODE_ADDED:
- logger.info("master node added : {}", path);
- break;
- case NODE_REMOVED:
- removeZKNodePath(path, ZKNodeType.MASTER, true);
- break;
- default:
- break;
- }
- }
-
- /**
- * monitor worker
- * @param event event
- * @param path path
- */
- public void handleWorkerEvent(TreeCacheEvent event, String path){
- switch (event.getType()) {
- case NODE_ADDED:
- logger.info("worker node added : {}", path);
- break;
- case NODE_REMOVED:
- logger.info("worker node deleted : {}", path);
- removeZKNodePath(path, ZKNodeType.WORKER, true);
- break;
- default:
- break;
- }
- }
-
- /**
- * task needs failover if task start before worker starts
- *
- * @param taskInstance task instance
- * @return true if task instance need fail over
- */
- private boolean checkTaskInstanceNeedFailover(TaskInstance
taskInstance) throws Exception {
-
- boolean taskNeedFailover = true;
-
- //now no host will execute this task instance,so no need to
failover the task
- if(taskInstance.getHost() == null){
- return false;
- }
-
- // if the worker node exists in zookeeper, we must check the
task starts after the worker
- if(checkZKNodeExists(taskInstance.getHost(),
ZKNodeType.WORKER)){
- //if task start after worker starts, there is no need
to failover the task.
- if(checkTaskAfterWorkerStart(taskInstance)){
- taskNeedFailover = false;
- }
- }
- return taskNeedFailover;
- }
-
- /**
- * check task start after the worker server starts.
- *
- * @param taskInstance task instance
- * @return true if task instance start time after worker server start
date
- */
- private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
- if(StringUtils.isEmpty(taskInstance.getHost())){
- return false;
- }
- Date workerServerStartDate = null;
- List<Server> workerServers = getServersList(ZKNodeType.WORKER);
- for(Server workerServer : workerServers){
- if(taskInstance.getHost().equals(workerServer.getHost()
+ Constants.COLON + workerServer.getPort())){
- workerServerStartDate =
workerServer.getCreateTime();
- break;
- }
- }
-
- if(workerServerStartDate != null){
- return
taskInstance.getStartTime().after(workerServerStartDate);
- }else{
- return false;
- }
- }
-
- /**
- * failover worker tasks
- *
- * 1. kill yarn job if there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. failover all tasks when workerHost is null
- * @param workerHost worker host
- */
-
- /**
- * failover worker tasks
- *
- * 1. kill yarn job if there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. failover all tasks when workerHost is null
- * @param workerHost worker host
- * @param needCheckWorkerAlive need check worker alive
- * @throws Exception exception
- */
- private void failoverWorker(String workerHost, boolean
needCheckWorkerAlive) throws Exception {
- logger.info("start worker[{}] failover ...", workerHost);
-
- List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
- for(TaskInstance taskInstance : needFailoverTaskInstanceList){
- if(needCheckWorkerAlive){
-
if(!checkTaskInstanceNeedFailover(taskInstance)){
- continue;
- }
- }
-
- ProcessInstance processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
- if(processInstance != null){
-
taskInstance.setProcessInstance(processInstance);
- }
-
- TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
-
.buildTaskInstanceRelatedInfo(taskInstance)
-
.buildProcessInstanceRelatedInfo(processInstance)
- .create();
- // only kill yarn job if exists , the local thread has
exited
- ProcessUtils.killYarnJob(taskExecutionContext);
-
-
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- processService.saveTaskInstance(taskInstance);
- }
- logger.info("end worker[{}] failover ...", workerHost);
- }
-
- /**
- * failover master tasks
- *
- * @param masterHost master host
- */
- private void failoverMaster(String masterHost) {
- logger.info("start master failover ...");
-
- List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
-
- logger.info("failover process list size:{} ",
needFailoverProcessInstanceList.size());
- //updateProcessInstance host is null and insert into command
- for(ProcessInstance processInstance :
needFailoverProcessInstanceList){
- logger.info("failover process instance id: {} host:{}",
- processInstance.getId(),
processInstance.getHost());
- if(Constants.NULL.equals(processInstance.getHost()) ){
- continue;
- }
-
processService.processNeedFailoverProcessInstances(processInstance);
- }
-
- logger.info("master failover end");
- }
-
- public InterProcessMutex blockAcquireMutex() throws Exception {
- InterProcessMutex mutex = new InterProcessMutex(getZkClient(),
getMasterLockPath());
- mutex.acquire();
- return mutex;
- }
+ /**
+ * logger
+ */
+ private static final Logger logger =
LoggerFactory.getLogger(ZKMasterClient.class);
+
+ /**
+ * process service
+ */
+ @Autowired
+ private ProcessService processService;
+
+ /**
+ * master registry
+ */
+ @Autowired
+ private MasterRegistry masterRegistry;
+
+ public void start() {
+
+ InterProcessMutex mutex = null;
+ try {
+ // create distributed lock with the root node path of the lock
space as /dolphinscheduler/lock/failover/master
+ String znodeLock = getMasterStartUpLockPath();
+ mutex = new InterProcessMutex(getZkClient(), znodeLock);
+ mutex.acquire();
+
+ // Master registry
+ masterRegistry.registry();
+
+ // init system znode
+ this.initSystemZNode();
+
+ while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)) {
+ ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ }
+ // startup tolerant
+ if (getActiveMasterNum() == 1) {
+ removeZKNodePath(null, ZKNodeType.MASTER, true);
+ removeZKNodePath(null, ZKNodeType.WORKER, true);
+ }
+ registerListener();
+ } catch (Exception e) {
+ logger.error("master start up exception", e);
+ } finally {
+ releaseMutex(mutex);
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ masterRegistry.unRegistry();
+ }
+
+ /**
+ * handle path events that this class cares about
+ *
+ * @param client zkClient
+ * @param event path event
+ * @param path zk path
+ */
+ @Override
+ protected void dataChanged(CuratorFramework client, TreeCacheEvent event,
String path) {
+ //monitor master
+ if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) +
Constants.SINGLE_SLASH)) {
+ handleMasterEvent(event, path);
+ } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) +
Constants.SINGLE_SLASH)) {
+ //monitor worker
+ handleWorkerEvent(event, path);
+ }
+ }
+
+ /**
+ * remove zookeeper node path
+ *
+ * @param path zookeeper node path
+ * @param zkNodeType zookeeper node type
+ * @param failover is failover
+ */
+ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean
failover) {
+ logger.info("{} node deleted : {}", zkNodeType.toString(), path);
+ InterProcessMutex mutex = null;
+ try {
+ String failoverPath = getFailoverLockPath(zkNodeType);
+ // create a distributed lock
+ mutex = new InterProcessMutex(getZkClient(), failoverPath);
+ mutex.acquire();
+
+ String serverHost = null;
+ if (StringUtils.isNotEmpty(path)) {
+ serverHost = getHostByEventDataPath(path);
+ if (StringUtils.isEmpty(serverHost)) {
+ logger.error("server down error: unknown path: {}", path);
+ return;
+ }
+ // handle dead server
+ handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
+ }
+ //failover server
+ if (failover) {
+ failoverServerWhenDown(serverHost, zkNodeType);
+ }
+ } catch (Exception e) {
+ logger.error("{} server failover failed.", zkNodeType.toString());
+ logger.error("failover exception ", e);
+ } finally {
+ releaseMutex(mutex);
+ }
+ }
+
+ /**
+ * failover server when server down
+ *
+ * @param serverHost server host
+ * @param zkNodeType zookeeper node type
+ * @throws Exception exception
+ */
+ private void failoverServerWhenDown(String serverHost, ZKNodeType
zkNodeType) throws Exception {
+ if (StringUtils.isEmpty(serverHost)) {
+ return;
+ }
+ switch (zkNodeType) {
+ case MASTER:
+ failoverMaster(serverHost);
+ break;
+ case WORKER:
+ failoverWorker(serverHost, true);
+ default:
+ break;
+ }
+ }
+
+ /**
+ * get failover lock path
+ *
+ * @param zkNodeType zookeeper node type
+ * @return fail over lock path
+ */
+ private String getFailoverLockPath(ZKNodeType zkNodeType) {
+
+ switch (zkNodeType) {
+ case MASTER:
+ return getMasterFailoverLockPath();
+ case WORKER:
+ return getWorkerFailoverLockPath();
+ default:
+ return "";
+ }
+ }
+
+ /**
+ * monitor master
+ *
+ * @param event event
+ * @param path path
+ */
+ public void handleMasterEvent(TreeCacheEvent event, String path) {
+ switch (event.getType()) {
+ case NODE_ADDED:
+ logger.info("master node added : {}", path);
+ break;
+ case NODE_REMOVED:
+ removeZKNodePath(path, ZKNodeType.MASTER, true);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * monitor worker
+ *
+ * @param event event
+ * @param path path
+ */
+ public void handleWorkerEvent(TreeCacheEvent event, String path) {
+ switch (event.getType()) {
+ case NODE_ADDED:
+ logger.info("worker node added : {}", path);
+ break;
+ case NODE_REMOVED:
+ logger.info("worker node deleted : {}", path);
+ removeZKNodePath(path, ZKNodeType.WORKER, true);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * task needs failover if task start before worker starts
+ *
+ * @param taskInstance task instance
+ * @return true if task instance need fail over
+ */
+ private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance)
throws Exception {
+
+ boolean taskNeedFailover = true;
+
+ //now no host will execute this task instance,so no need to failover
the task
+ if (taskInstance.getHost() == null) {
+ return false;
+ }
+
+ // if the worker node exists in zookeeper, we must check the task
starts after the worker
+ if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)) {
+ //if task start after worker starts, there is no need to failover
the task.
+ if (checkTaskAfterWorkerStart(taskInstance)) {
+ taskNeedFailover = false;
+ }
+ }
+ return taskNeedFailover;
+ }
+
+ /**
+ * check task start after the worker server starts.
+ *
+ * @param taskInstance task instance
+ * @return true if task instance start time after worker server start date
+ */
+ private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(taskInstance.getHost())) {
+ return false;
+ }
+ Date workerServerStartDate = null;
+ List<Server> workerServers = getServersList(ZKNodeType.WORKER);
+ for (Server workerServer : workerServers) {
+ if (taskInstance.getHost().equals(workerServer.getHost() +
Constants.COLON + workerServer.getPort())) {
+ workerServerStartDate = workerServer.getCreateTime();
+ break;
+ }
+ }
+
+ if (workerServerStartDate != null) {
+ return taskInstance.getStartTime().after(workerServerStartDate);
+ }
+ return false;
+ }
+
+ /**
+ * failover worker tasks
+ *
+ * 1. kill yarn job if there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. failover all tasks when workerHost is null
+ * @param workerHost worker host
+ */
+
+ /**
+ * failover worker tasks
+ * <p>
+ * 1. kill yarn job if there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. failover all tasks when workerHost is null
+ *
+ * @param workerHost worker host
+ * @param needCheckWorkerAlive need check worker alive
+ * @throws Exception exception
+ */
+ private void failoverWorker(String workerHost, boolean
needCheckWorkerAlive) throws Exception {
+ logger.info("start worker[{}] failover ...", workerHost);
+
+ List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
+ for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+ if (needCheckWorkerAlive) {
+ if (!checkTaskInstanceNeedFailover(taskInstance)) {
+ continue;
+ }
+ }
+
+ ProcessInstance processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+ if (processInstance != null) {
+ taskInstance.setProcessInstance(processInstance);
+ }
+
+ TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
+ // only kill yarn job if exists , the local thread has exited
+ ProcessUtils.killYarnJob(taskExecutionContext);
+
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ processService.saveTaskInstance(taskInstance);
+ }
+ logger.info("end worker[{}] failover ...", workerHost);
+ }
+
+ /**
+ * failover master tasks
+ *
+ * @param masterHost master host
+ */
+ private void failoverMaster(String masterHost) {
+ logger.info("start master failover ...");
+
+ List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
+
+ logger.info("failover process list size:{} ",
needFailoverProcessInstanceList.size());
+ //updateProcessInstance host is null and insert into command
+ for (ProcessInstance processInstance :
needFailoverProcessInstanceList) {
+ logger.info("failover process instance id: {} host:{}",
+ processInstance.getId(), processInstance.getHost());
+ if (Constants.NULL.equals(processInstance.getHost())) {
+ continue;
+ }
+
processService.processNeedFailoverProcessInstances(processInstance);
+ }
+
+ logger.info("master failover end");
+ }
+
+ public InterProcessMutex blockAcquireMutex() throws Exception {
+ InterProcessMutex mutex = new InterProcessMutex(getZkClient(),
getMasterLockPath());
+ mutex.acquire();
+ return mutex;
+ }
}
\ No newline at end of file