This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 055bb28  worker fault tolerance modify (#2212)
055bb28 is described below

commit 055bb28de41d58183d34bf51a880280503d93099
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue Mar 17 18:45:39 2020 +0800

    worker fault tolerance modify (#2212)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    * ProcedureTask optimization
    
    * MasterSchedulerService modify
    
    * TaskUpdateQueueConsumer modify
    
    * test
    
    * DataxTask process run debug
    
    * DataxTask process run debug
    
    * add protobuf dependency,MR、Spark task etc need this
    
    * TaskUpdateQueueConsumer modify
    
    * TaskExecutionContextBuilder set TaskInstance workgroup
    
    * WorkerGroupService queryAllGroup modify
    query available work group
    
    * 1,get workergroup from zk modify
    2,SpringConnectionFactory repeat load modify
    
    * master and worker register ip  use OSUtils.getHost()
    
    * ProcessInstance host set ip:port format
    
    * worker fault tolerance modify
    
    * Constants and .env modify
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../api/controller/LoggerController.java           |   8 +-
 .../apache/dolphinscheduler/common/Constants.java  | 124 +--------------------
 .../builder/TaskExecutionContextBuilder.java       |   1 +
 .../server/utils/ProcessUtils.java                 |  21 ++--
 .../dolphinscheduler/server/zk/ZKMasterClient.java |  45 ++++----
 .../service/zk/AbstractZKClient.java               |  58 +++-------
 .../service/zk/ZookeeperCachedOperator.java        |   2 +-
 dolphinscheduler-ui/.env                           |   2 +-
 8 files changed, 59 insertions(+), 202 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
index 802f09f..eefd6ba 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
@@ -60,14 +60,14 @@ public class LoggerController extends BaseController {
      */
     @ApiOperation(value = "queryLog", notes= "QUERY_TASK_INSTANCE_LOG_NOTES")
     @ApiImplicitParams({
-            @ApiImplicitParam(name = "taskInstId", value = "TASK_ID", dataType 
= "Int", example = "100"),
+            @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", 
dataType = "Int", example = "100"),
             @ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", 
dataType ="Int", example = "100"),
             @ApiImplicitParam(name = "limit", value = "LIMIT", dataType 
="Int", example = "100")
     })
     @GetMapping(value = "/detail")
     @ResponseStatus(HttpStatus.OK)
     public Result queryLog(@ApiIgnore @RequestAttribute(value = 
Constants.SESSION_USER) User loginUser,
-                           @RequestParam(value = "taskInstId") int 
taskInstanceId,
+                           @RequestParam(value = "taskInstanceId") int 
taskInstanceId,
                            @RequestParam(value = "skipLineNum") int skipNum,
                            @RequestParam(value = "limit") int limit) {
         try {
@@ -91,12 +91,12 @@ public class LoggerController extends BaseController {
      */
     @ApiOperation(value = "downloadTaskLog", notes= 
"DOWNLOAD_TASK_INSTANCE_LOG_NOTES")
     @ApiImplicitParams({
-            @ApiImplicitParam(name = "taskInstId", value = "TASK_ID",dataType 
= "Int", example = "100")
+            @ApiImplicitParam(name = "taskInstanceId", value = 
"TASK_ID",dataType = "Int", example = "100")
     })
     @GetMapping(value = "/download-log")
     @ResponseBody
     public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = 
Constants.SESSION_USER) User loginUser,
-                                          @RequestParam(value = "taskInstId") 
int taskInstanceId) {
+                                          @RequestParam(value = 
"taskInstanceId") int taskInstanceId) {
         try {
             byte[] logBytes = loggerService.getLogBytes(taskInstanceId);
             return ResponseEntity
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 67ce5fd..2aded0f 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -119,19 +119,14 @@ public final class Constants {
     public static final String RES_UPLOAD_STARTUP_TYPE = 
"res.upload.startup.type";
 
     /**
-     * zookeeper quorum
-     */
-    public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
-
-    /**
      * MasterServer directory registered in zookeeper
      */
-    public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/masters";
+    public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = 
"/nodes/masters";
 
     /**
      * WorkerServer directory registered in zookeeper
      */
-    public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "/workers";
+    public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = 
"/nodes/worker";
 
     /**
      * all servers directory registered in zookeeper
@@ -143,10 +138,6 @@ public final class Constants {
      */
     public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = 
"/lock/masters";
 
-    /**
-     * WorkerServer lock directory registered in zookeeper
-     */
-    public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = 
"/lock/workers";
 
     /**
      * MasterServer failover directory registered in zookeeper
@@ -163,10 +154,6 @@ public final class Constants {
      */
     public static final String 
ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = 
"/lock/failover/startup-masters";
 
-    /**
-     * need send warn times when master server or worker server failover
-     */
-    public static final int DOLPHINSCHEDULER_WARN_TIMES_FAILOVER = 3;
 
     /**
      * comma ,
@@ -203,37 +190,6 @@ public final class Constants {
      */
     public static final String EQUAL_SIGN = "=";
 
-    /**
-     * ZOOKEEPER_SESSION_TIMEOUT
-     */
-    public static final String ZOOKEEPER_SESSION_TIMEOUT = 
"zookeeper.session.timeout";
-
-    public static final String ZOOKEEPER_CONNECTION_TIMEOUT = 
"zookeeper.connection.timeout";
-
-    public static final String ZOOKEEPER_RETRY_SLEEP = "zookeeper.retry.sleep";
-    public static final String ZOOKEEPER_RETRY_BASE_SLEEP = 
"zookeeper.retry.base.sleep";
-    public static final String ZOOKEEPER_RETRY_MAX_SLEEP = 
"zookeeper.retry.max.sleep";
-
-    public static final String ZOOKEEPER_RETRY_MAXTIME = 
"zookeeper.retry.maxtime";
-
-
-    public static final String MASTER_HEARTBEAT_INTERVAL = 
"master.heartbeat.interval";
-
-    public static final String MASTER_EXEC_THREADS = "master.exec.threads";
-
-    public static final String MASTER_EXEC_TASK_THREADS = 
"master.exec.task.number";
-
-
-    public static final String MASTER_COMMIT_RETRY_TIMES = 
"master.task.commit.retryTimes";
-
-    public static final String MASTER_COMMIT_RETRY_INTERVAL = 
"master.task.commit.interval";
-
-
-    public static final String WORKER_EXEC_THREADS = "worker.exec.threads";
-
-    public static final String WORKER_HEARTBEAT_INTERVAL = 
"worker.heartbeat.interval";
-
-    public static final String WORKER_FETCH_TASK_NUM = "worker.fetch.task.num";
 
     public static final String WORKER_MAX_CPULOAD_AVG = 
"worker.max.cpuload.avg";
 
@@ -244,17 +200,6 @@ public final class Constants {
     public static final String MASTER_RESERVED_MEMORY = 
"master.reserved.memory";
 
 
-    /**
-     * dolphinscheduler tasks queue
-     */
-    public static final String DOLPHINSCHEDULER_TASKS_QUEUE = "tasks_queue";
-
-    /**
-     * dolphinscheduler need kill tasks queue
-     */
-    public static final String DOLPHINSCHEDULER_TASKS_KILL = "tasks_kill";
-
-    public static final String ZOOKEEPER_DOLPHINSCHEDULER_ROOT = 
"zookeeper.dolphinscheduler.root";
 
     public static final String SCHEDULER_QUEUE_IMPL = 
"dolphinscheduler.queue.impl";
 
@@ -351,26 +296,6 @@ public final class Constants {
 
 
     /**
-     * heartbeat threads number
-     */
-    public static final int DEFAUL_WORKER_HEARTBEAT_THREAD_NUM = 1;
-
-    /**
-     * heartbeat interval
-     */
-    public static final int DEFAULT_WORKER_HEARTBEAT_INTERVAL = 60;
-
-    /**
-     * worker fetch task number
-     */
-    public static final int DEFAULT_WORKER_FETCH_TASK_NUM = 1;
-
-    /**
-     * worker execute threads number
-     */
-    public static final int DEFAULT_WORKER_EXEC_THREAD_NUM = 10;
-
-    /**
      * master cpu load
      */
     public static final int DEFAULT_MASTER_CPU_LOAD = 
Runtime.getRuntime().availableProcessors() * 2;
@@ -391,16 +316,6 @@ public final class Constants {
     public static final double DEFAULT_WORKER_RESERVED_MEMORY = 
OSUtils.totalMemorySize() / 10;
 
 
-    /**
-     * master execute threads number
-     */
-    public static final int DEFAULT_MASTER_EXEC_THREAD_NUM = 100;
-
-
-    /**
-     * default master concurrent task execute num
-     */
-    public static final int DEFAULT_MASTER_TASK_EXEC_NUM = 20;
 
     /**
      * default log cache rows num,output when reach the number
@@ -408,34 +323,12 @@ public final class Constants {
     public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16;
 
     /**
-     * log flush interval,output when reach the interval
+     * log flush interval?output when reach the interval
      */
     public static final int DEFAULT_LOG_FLUSH_INTERVAL = 1000;
 
 
     /**
-     * default master heartbeat thread number
-     */
-    public static final int DEFAULT_MASTER_HEARTBEAT_THREAD_NUM = 1;
-
-
-    /**
-     * default master heartbeat interval
-     */
-    public static final int DEFAULT_MASTER_HEARTBEAT_INTERVAL = 60;
-
-    /**
-     * default master commit retry times
-     */
-    public static final int DEFAULT_MASTER_COMMIT_RETRY_TIMES = 5;
-
-
-    /**
-     * default master commit retry interval
-     */
-    public static final int DEFAULT_MASTER_COMMIT_RETRY_INTERVAL = 3000;
-
-    /**
      * time unit secong to minutes
      */
     public static final int SEC_2_MINUTES_TIME_UNIT = 60;
@@ -805,7 +698,6 @@ public final class Constants {
     public static final String ALIAS = "alias";
     public static final String CONTENT = "content";
     public static final String DEPENDENT_SPLIT = ":||";
-    public static final String DEPENDENT_ALL = "ALL";
 
 
     /**
@@ -864,7 +756,7 @@ public final class Constants {
      */
     public static final String HIVE_CONF = "hiveconf:";
 
-    //flink 任务
+    //flink ??
     public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
     public static final String FLINK_RUN_MODE = "-m";
     public static final String FLINK_YARN_SLOT = "-ys";
@@ -899,26 +791,20 @@ public final class Constants {
 
     /**
      * data total
-     * 数据总数
      */
     public  static final String COUNT = "count";
 
     /**
      * page size
-     * 每页数据条数
      */
     public  static final String PAGE_SIZE = "pageSize";
 
     /**
      * current page no
-     * 当前页码
      */
     public  static final String PAGE_NUMBER = "pageNo";
 
-    /**
-     * result
-     */
-    public static final String RESULT = "result";
+
 
     /**
      *
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 08c105a..fc60e88 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -49,6 +49,7 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
         taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
         taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
+        taskExecutionContext.setHost(taskInstance.getHost());
         return this;
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index f29e7df..12cd66f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public class ProcessUtils {
       allowAmbiguousCommands = true;
       String value = 
System.getProperty("jdk.lang.Process.allowAmbiguousCommands");
       if (value != null) {
-          allowAmbiguousCommands = !"false".equalsIgnoreCase(value);
+        allowAmbiguousCommands = !"false".equalsIgnoreCase(value);
       }
     }
     if (allowAmbiguousCommands) {
@@ -68,7 +69,7 @@ public class ProcessUtils {
       String executablePath = new File(cmd[0]).getPath();
 
       if (needsEscaping(VERIFICATION_LEGACY, executablePath)) {
-          executablePath = quoteString(executablePath);
+        executablePath = quoteString(executablePath);
       }
 
       cmdstr = createCommandLine(
@@ -81,7 +82,7 @@ public class ProcessUtils {
 
         StringBuilder join = new StringBuilder();
         for (String s : cmd) {
-            join.append(s).append(' ');
+          join.append(s).append(' ');
         }
 
         cmd = getTokensFromCommand(join.toString());
@@ -89,7 +90,7 @@ public class ProcessUtils {
 
         // Check new executable name once more
         if (security != null) {
-            security.checkExec(executablePath);
+          security.checkExec(executablePath);
         }
       }
 
@@ -147,7 +148,7 @@ public class ProcessUtils {
     ArrayList<String> matchList = new ArrayList<>(8);
     Matcher regexMatcher = LazyPattern.PATTERN.matcher(command);
     while (regexMatcher.find()) {
-        matchList.add(regexMatcher.group());
+      matchList.add(regexMatcher.group());
     }
     return matchList.toArray(new String[matchList.size()]);
   }
@@ -323,9 +324,9 @@ public class ProcessUtils {
     try {
       int processId = taskExecutionContext.getProcessId();
       if(processId == 0 ){
-          logger.error("process kill failed, process id :{}, task id:{}",
-                  processId, taskExecutionContext.getTaskInstanceId());
-          return ;
+        logger.error("process kill failed, process id :{}, task id:{}",
+                processId, taskExecutionContext.getTaskInstanceId());
+        return ;
       }
 
       String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
@@ -379,7 +380,9 @@ public class ProcessUtils {
       String log = null;
       try {
         logClient = new LogClientService();
-        log = logClient.viewLog(taskExecutionContext.getHost(), 
Constants.RPC_PORT, taskExecutionContext.getLogPath());
+        log = 
logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
+                Constants.RPC_PORT,
+                taskExecutionContext.getLogPath());
       } finally {
         if(logClient != null){
           logClient.close();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index a59cf3e..0e9a839 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -59,7 +59,7 @@ public class ZKMasterClient extends AbstractZKClient {
        @Autowired
        private ProcessService processService;
 
-    public void start() {
+       public void start() {
 
                InterProcessMutex mutex = null;
                try {
@@ -71,7 +71,7 @@ public class ZKMasterClient extends AbstractZKClient {
                        // init system znode
                        this.initSystemZNode();
 
-                       // check if fault tolerance is required,failure and 
tolerance
+                       // check if fault tolerance is required?failure and 
tolerance
                        if (getActiveMasterNum() == 1) {
                                failoverWorker(null, true);
                                failoverMaster(null);
@@ -146,8 +146,8 @@ public class ZKMasterClient extends AbstractZKClient {
         * @throws Exception    exception
         */
        private void failoverServerWhenDown(String serverHost, ZKNodeType 
zkNodeType) throws Exception {
-           if(StringUtils.isEmpty(serverHost)){
-               return ;
+               if(StringUtils.isEmpty(serverHost)){
+                       return ;
                }
                switch (zkNodeType){
                        case MASTER:
@@ -217,7 +217,7 @@ public class ZKMasterClient extends AbstractZKClient {
 
        /**
         * task needs failover if task start before worker starts
-     *
+        *
         * @param taskInstance task instance
         * @return true if task instance need fail over
         */
@@ -231,10 +231,10 @@ public class ZKMasterClient extends AbstractZKClient {
                }
 
                // if the worker node exists in zookeeper, we must check the 
task starts after the worker
-           if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
-               //if task start after worker starts, there is no need to 
failover the task.
-               if(checkTaskAfterWorkerStart(taskInstance)){
-                   taskNeedFailover = false;
+               if(checkZKNodeExists(taskInstance.getHost(), 
ZKNodeType.WORKER)){
+                       //if task start after worker starts, there is no need 
to failover the task.
+                       if(checkTaskAfterWorkerStart(taskInstance)){
+                               taskNeedFailover = false;
                        }
                }
                return taskNeedFailover;
@@ -247,15 +247,15 @@ public class ZKMasterClient extends AbstractZKClient {
         * @return true if task instance start time after worker server start 
date
         */
        private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
-           if(StringUtils.isEmpty(taskInstance.getHost())){
-               return false;
+               if(StringUtils.isEmpty(taskInstance.getHost())){
+                       return false;
                }
-           Date workerServerStartDate = null;
-           List<Server> workerServers = getServersList(ZKNodeType.WORKER);
-           for(Server workerServer : workerServers){
-               if(workerServer.getHost().equals(taskInstance.getHost())){
-                   workerServerStartDate = workerServer.getCreateTime();
-                   break;
+               Date workerServerStartDate = null;
+               List<Server> workerServers = getServersList(ZKNodeType.WORKER);
+               for(Server workerServer : workerServers){
+                       
if(workerServer.getHost().equals(taskInstance.getHost())){
+                               workerServerStartDate = 
workerServer.getCreateTime();
+                               break;
                        }
                }
 
@@ -271,7 +271,7 @@ public class ZKMasterClient extends AbstractZKClient {
         *
         * 1. kill yarn job if there are yarn jobs in tasks.
         * 2. change task state from running to need failover.
-     * 3. failover all tasks when workerHost is null
+        * 3. failover all tasks when workerHost is null
         * @param workerHost worker host
         */
 
@@ -293,7 +293,7 @@ public class ZKMasterClient extends AbstractZKClient {
                        if(needCheckWorkerAlive){
                                
if(!checkTaskInstanceNeedFailover(taskInstance)){
                                        continue;
-                }
+                               }
                        }
 
                        ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
@@ -304,7 +304,6 @@ public class ZKMasterClient extends AbstractZKClient {
                        TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
                                        
.buildTaskInstanceRelatedInfo(taskInstance)
                                        
.buildProcessInstanceRelatedInfo(processInstance)
-                                       .buildProcessDefinitionRelatedInfo(null)
                                        .create();
                        // only kill yarn job if exists , the local thread has 
exited
                        ProcessUtils.killYarnJob(taskExecutionContext);
@@ -334,9 +333,9 @@ public class ZKMasterClient extends AbstractZKClient {
        }
 
        public InterProcessMutex blockAcquireMutex() throws Exception {
-        InterProcessMutex mutex = new InterProcessMutex(getZkClient(), 
getMasterLockPath());
-        mutex.acquire();
-        return mutex;
+               InterProcessMutex mutex = new InterProcessMutex(getZkClient(), 
getMasterLockPath());
+               mutex.acquire();
+               return mutex;
        }
 
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 0b9fbe4..106dbc1 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -38,56 +38,24 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
 
        private static final Logger logger = 
LoggerFactory.getLogger(AbstractZKClient.class);
 
+
        /**
-        *      check dead server or not , if dead, stop self
-        *
-        * @param zNode          node path
-        * @param serverType master or worker prefix
-        * @return  true if not exists
-        * @throws Exception errors
+        *  remove dead server by host
+        * @param host host
+        * @param serverType serverType
+        * @throws Exception
         */
-       protected boolean checkIsDeadServer(String zNode, String serverType) 
throws Exception{
-               //ip_sequenceno
-               String[] zNodesPath = zNode.split("\\/");
-               String ipSeqNo = zNodesPath[zNodesPath.length - 1];
-
-               String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX 
: WORKER_PREFIX;
-               String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH 
+ type + UNDERLINE + ipSeqNo;
-
-               if(!isExisted(zNode) || isExisted(deadServerPath)){
-                       return true;
-               }
-               return false;
-       }
-
-
        public void removeDeadServerByHost(String host, String serverType) 
throws Exception {
-        List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
-        for(String serverPath : deadServers){
-            if(serverPath.startsWith(serverType+UNDERLINE+host)){
-               String server = getDeadZNodeParentPath() + SINGLE_SLASH + 
serverPath;
-               super.remove(server);
+               List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
+               for(String serverPath : deadServers){
+                       if(serverPath.startsWith(serverType+UNDERLINE+host)){
+                               String server = getDeadZNodeParentPath() + 
SINGLE_SLASH + serverPath;
+                               super.remove(server);
                                logger.info("{} server {} deleted from zk dead 
server path success" , serverType , host);
-            }
-        }
+                       }
+               }
        }
 
-       /**
-        * create zookeeper path according the zk node type.
-        * @param zkNodeType zookeeper node type
-        * @return  register zookeeper path
-        * @throws Exception
-        */
-       private String createZNodePath(ZKNodeType zkNodeType, String host) 
throws Exception {
-               // specify the format of stored data in ZK nodes
-               String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
-               // create temporary sequence nodes for master znode
-               String registerPath= getZNodeParentPath(zkNodeType) + 
SINGLE_SLASH + host;
-
-       super.persistEphemeral(registerPath, heartbeatZKInfo);
-               logger.info("register {} node {} success" , 
zkNodeType.toString(), registerPath);
-               return registerPath;
-       }
 
        /**
         * opType(add): if find dead server , then add to zk deadServerPath
@@ -326,7 +294,7 @@ public abstract class AbstractZKClient extends 
ZookeeperCachedOperator {
         */
        protected String getHostByEventDataPath(String path) {
                if(StringUtils.isEmpty(path)){
-                   logger.error("empty path!");
+                       logger.error("empty path!");
                        return "";
                }
                String[] pathArray = path.split(SINGLE_SLASH);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 6c38a68..e71cb74 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -39,7 +39,7 @@ public class ZookeeperCachedOperator extends 
ZookeeperOperator {
      */
     @Override
     protected void registerListener() {
-        treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot());
+        treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + 
"/nodes");
         logger.info("add listener to zk path: {}", 
getZookeeperConfig().getDsRoot());
         try {
             treeCache.start();
diff --git a/dolphinscheduler-ui/.env b/dolphinscheduler-ui/.env
index 4c7e96e..e676be6 100644
--- a/dolphinscheduler-ui/.env
+++ b/dolphinscheduler-ui/.env
@@ -17,4 +17,4 @@
 API_BASE = http://192.168.xx.xx:12345
 
 # If IP access is required for local development, remove the "#"
-#DEV_HOST = 192.168.xx.xx
\ No newline at end of file
+#DEV_HOST = 192.168.xx.xx

Reply via email to