This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 2bb5eba  add TaskInstanceCacheManager receive Worker report 
result,modify master polling db transfrom to cache (#2021)
2bb5eba is described below

commit 2bb5ebaf3a20297b3cfa4d8e046f9a6479e72d5a
Author: qiaozhanwei <[email protected]>
AuthorDate: Wed Feb 26 18:46:05 2020 +0800

    add TaskInstanceCacheManager receive Worker report result,modify master 
polling db transfrom to cache (#2021)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
---
 .../dolphinscheduler/common/utils/CommonUtils.java |   2 +-
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  22 +--
 .../remote/command/ExecuteTaskResponseCommand.java |   2 +-
 .../remote/entity/TaskExecutionContext.java        |  15 +-
 .../builder/TaskExecutionContextBuilder.java       |   1 +
 .../master/cache/TaskInstanceCacheManager.java     |  64 ++++++
 .../cache/impl/TaskInstanceCacheManagerImpl.java   | 115 +++++++++++
 .../server/master/processor/TaskAckProcessor.java  |  13 +-
 .../master/processor/TaskResponseProcessor.java    |  18 +-
 .../master/runner/MasterBaseTaskExecThread.java    |  79 ++++++--
 .../server/master/runner/MasterExecThread.java     |   4 +-
 .../server/master/runner/MasterTaskExecThread.java |  26 ++-
 .../worker/processor/TaskExecuteProcessor.java     |   1 +
 .../server/worker/processor/TaskKillProcessor.java |  19 ++
 .../server/worker/runner/TaskExecuteThread.java    |   8 +-
 .../worker/task/AbstractCommandExecutor.java       | 214 ++++++++++-----------
 .../server/worker/task/AbstractTask.java           |  30 ++-
 .../server/worker/task/AbstractYarnTask.java       |  20 +-
 .../server/worker/task/CommandExecuteResult.java   |  69 +++++++
 .../server/worker/task/PythonCommandExecutor.java  |  13 +-
 .../server/worker/task/ShellCommandExecutor.java   |  17 +-
 .../server/worker/task/TaskProps.java              |  84 +++++---
 .../server/worker/task/datax/DataxTask.java        |  20 +-
 .../worker/task/dependent/DependentTask.java       |   2 +-
 .../server/worker/task/flink/FlinkTask.java        |   2 +-
 .../server/worker/task/http/HttpTask.java          |   2 +-
 .../server/worker/task/python/PythonTask.java      |  21 +-
 .../server/worker/task/shell/ShellTask.java        |  16 +-
 .../server/worker/task/sql/SqlTask.java            |   8 +-
 .../worker/shell/ShellCommandExecutorTest.java     |   4 +-
 .../server/worker/sql/SqlExecutorTest.java         |   6 +-
 .../server/worker/task/datax/DataxTaskTest.java    |  12 +-
 .../worker/task/dependent/DependentTaskTest.java   |   2 +-
 .../conf/home/pages/dag/_source/formModel/log.vue  |   4 +-
 34 files changed, 679 insertions(+), 256 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
index b4b89bf..d15ede2 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
@@ -26,7 +26,7 @@ import java.io.File;
 /**
  * common utils
  */
-public class  CommonUtils {
+public class CommonUtils {
   private CommonUtils() {
     throw new IllegalStateException("CommonUtils class");
   }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 428f5d4..92cb3af 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -462,6 +462,14 @@ public class TaskInstance implements Serializable {
         this.workerGroupId = workerGroupId;
     }
 
+    public String getDependentResult() {
+        return dependentResult;
+    }
+
+    public void setDependentResult(String dependentResult) {
+        this.dependentResult = dependentResult;
+    }
+
     @Override
     public String toString() {
         return "TaskInstance{" +
@@ -481,27 +489,19 @@ public class TaskInstance implements Serializable {
                 ", logPath='" + logPath + '\'' +
                 ", retryTimes=" + retryTimes +
                 ", alertFlag=" + alertFlag +
-                ", flag=" + flag +
                 ", processInstance=" + processInstance +
                 ", processDefine=" + processDefine +
                 ", pid=" + pid +
                 ", appLink='" + appLink + '\'' +
                 ", flag=" + flag +
-                ", dependency=" + dependency +
+                ", dependency='" + dependency + '\'' +
                 ", duration=" + duration +
                 ", maxRetryTimes=" + maxRetryTimes +
                 ", retryInterval=" + retryInterval +
                 ", taskInstancePriority=" + taskInstancePriority +
                 ", processInstancePriority=" + processInstancePriority +
-                ", workGroupId=" + workerGroupId +
+                ", dependentResult='" + dependentResult + '\'' +
+                ", workerGroupId=" + workerGroupId +
                 '}';
     }
-
-    public String getDependentResult() {
-        return dependentResult;
-    }
-
-    public void setDependentResult(String dependentResult) {
-        this.dependentResult = dependentResult;
-    }
 }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index 6bbc2f7..d8253a8 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 response command
 */
public class ExecuteTaskResponseCommand implements Serializable {


    public ExecuteTaskResponseCommand() {
    }

    public ExecuteTaskResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    /**
     * package response command
     * @return command
     */
    public Command 
 convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 response command
 */
public class ExecuteTaskResponseCommand implements Serializable {


    public ExecuteTaskResponseCommand() {
    }

    public ExecuteTaskResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    /**
     * package response command
     * @return command
     */
    public Command 
 convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", status=" + status +
                ", endTime=" + endTime +
                '}';
    }
}
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
index e3da43a..711e3d8 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
@@ -52,11 +52,15 @@ public class TaskExecutionContext implements Serializable{
     private String executePath;
 
     /**
+     * log path
+     */
+    private String logPath;
+
+    /**
      *  task json
      */
     private String taskJson;
 
-
     /**
      *  process instance id
      */
@@ -228,6 +232,14 @@ public class TaskExecutionContext implements Serializable{
         this.cmdTypeIfComplement = cmdTypeIfComplement;
     }
 
+    public String getLogPath() {
+        return logPath;
+    }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
     @Override
     public String toString() {
         return "TaskExecutionContext{" +
@@ -236,6 +248,7 @@ public class TaskExecutionContext implements Serializable{
                 ", startTime=" + startTime +
                 ", taskType='" + taskType + '\'' +
                 ", executePath='" + executePath + '\'' +
+                ", logPath='" + logPath + '\'' +
                 ", taskJson='" + taskJson + '\'' +
                 ", processInstanceId=" + processInstanceId +
                 ", scheduleTime=" + scheduleTime +
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index a3ddd29..8cdd13e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -44,6 +44,7 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setTaskName(taskInstance.getName());
         taskExecutionContext.setStartTime(taskInstance.getStartTime());
         taskExecutionContext.setTaskType(taskInstance.getTaskType());
+        taskExecutionContext.setLogPath(taskInstance.getLogPath());
         taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
         taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
         return this;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
new file mode 100644
index 0000000..98d2a24
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cache;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+
+/**
+ *  task instance state manager
+ */
+public interface TaskInstanceCacheManager {
+
+    /**
+     * get taskInstance by taskInstance id
+     *
+     * @param taskInstanceId taskInstanceId
+     * @return taskInstance
+     */
+    TaskInstance getByTaskInstanceId(Integer taskInstanceId);
+
+    /**
+     * cache taskInstance
+     *
+     * @param taskExecutionContext taskExecutionContext
+     */
+    void cacheTaskInstance(TaskExecutionContext taskExecutionContext);
+
+    /**
+     * cache taskInstance
+     *
+     * @param taskAckCommand taskAckCommand
+     */
+    void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand);
+
+    /**
+     * cache taskInstance
+     *
+     * @param executeTaskResponseCommand executeTaskResponseCommand
+     */
+    void cacheTaskInstance(ExecuteTaskResponseCommand 
executeTaskResponseCommand);
+
+    /**
+     * remove taskInstance by taskInstanceId
+     * @param taskInstanceId taskInstanceId
+     */
+    void removeByTaskInstanceId(Integer taskInstanceId);
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
new file mode 100644
index 0000000..634a6a9
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.cache.impl;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  taskInstance state manager
+ */
+@Component
+public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
+
+    /**
+     * taskInstance caceh
+     */
+    private Map<Integer,TaskInstance> taskInstanceCache = new 
ConcurrentHashMap<>();
+
+
+    /**
+     * get taskInstance by taskInstance id
+     *
+     * @param taskInstanceId taskInstanceId
+     * @return taskInstance
+     */
+    @Override
+    public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
+        return taskInstanceCache.get(taskInstanceId);
+    }
+
+    /**
+     * cache taskInstance
+     *
+     * @param taskExecutionContext taskExecutionContext
+     */
+    @Override
+    public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) {
+        TaskInstance taskInstance = 
getByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        if (taskInstance == null){
+            taskInstance = new TaskInstance();
+        }
+        taskInstance.setId(taskExecutionContext.getTaskInstanceId());
+        taskInstance.setName(taskExecutionContext.getTaskName());
+        taskInstance.setStartTime(taskExecutionContext.getStartTime());
+        taskInstance.setTaskType(taskInstance.getTaskType());
+        taskInstance.setExecutePath(taskInstance.getExecutePath());
+        taskInstance.setTaskJson(taskInstance.getTaskJson());
+    }
+
+    /**
+     * cache taskInstance
+     *
+     * @param taskAckCommand taskAckCommand
+     */
+    @Override
+    public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) {
+        TaskInstance taskInstance = 
getByTaskInstanceId(taskAckCommand.getTaskInstanceId());
+        if (taskInstance == null){
+            taskInstance = new TaskInstance();
+        }
+        taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
+        taskInstance.setStartTime(taskAckCommand.getStartTime());
+        taskInstance.setHost(taskAckCommand.getHost());
+        taskInstance.setExecutePath(taskAckCommand.getExecutePath());
+        taskInstance.setLogPath(taskAckCommand.getLogPath());
+    }
+
+    /**
+     * cache taskInstance
+     *
+     * @param executeTaskResponseCommand executeTaskResponseCommand
+     */
+    @Override
+    public void cacheTaskInstance(ExecuteTaskResponseCommand 
executeTaskResponseCommand) {
+        TaskInstance taskInstance = 
getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId());
+        if (taskInstance == null){
+            taskInstance = new TaskInstance();
+        }
+        
taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus()));
+        taskInstance.setEndTime(executeTaskResponseCommand.getEndTime());
+    }
+
+    /**
+     * remove taskInstance by taskInstanceId
+     * @param taskInstanceId taskInstanceId
+     */
+    @Override
+    public void removeByTaskInstanceId(Integer taskInstanceId) {
+        taskInstanceCache.remove(taskInstanceId);
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 83da3b0..cf38579 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import 
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import 
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -42,8 +44,14 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
      */
     private final ProcessService processService;
 
+    /**
+     * taskInstance cache manager
+     */
+    private final TaskInstanceCacheManager taskInstanceCacheManager;
+
     public TaskAckProcessor(){
         this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
+        this.taskInstanceCacheManager = 
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
     }
 
     /**
@@ -55,7 +63,9 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
         ExecuteTaskAckCommand taskAckCommand = 
FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
-        logger.info("taskAckCommand : {}",taskAckCommand);
+        logger.info("taskAckCommand : {}", taskAckCommand);
+
+        taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
         /**
          * change Task state
          */
@@ -65,6 +75,7 @@ public class TaskAckProcessor implements 
NettyRequestProcessor {
                 taskAckCommand.getExecutePath(),
                 taskAckCommand.getLogPath(),
                 taskAckCommand.getTaskInstanceId());
+
     }
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index bbc710c..d6279c6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import 
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import 
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -42,8 +44,14 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
      */
     private final ProcessService processService;
 
+    /**
+     * taskInstance cache manager
+     */
+    private final TaskInstanceCacheManager taskInstanceCacheManager;
+
     public TaskResponseProcessor(){
         this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
+        this.taskInstanceCacheManager = 
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
     }
 
     /**
@@ -56,9 +64,15 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
-        logger.info("received command : {}", command);
+
         ExecuteTaskResponseCommand responseCommand = 
FastJsonSerializer.deserialize(command.getBody(), 
ExecuteTaskResponseCommand.class);
-        
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), 
responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
+        logger.info("received command : {}", responseCommand);
+
+        taskInstanceCacheManager.cacheTaskInstance(responseCommand);
+
+        
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
+                responseCommand.getEndTime(),
+                responseCommand.getTaskInstanceId());
     }
 
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 9bf69dd..d2a5439 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
 
+import static 
org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE;
+
 /**
  * master task exec base class
  */
@@ -163,6 +166,7 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
         String userQueue = 
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
         
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? 
tenant.getQueue() : userQueue);
         
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+        taskInstance.setExecutePath(getExecLocalPath(taskInstance));
 
         return TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
@@ -173,6 +177,19 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
 
 
     /**
+     * get execute local path
+     *
+     * @return execute local path
+     */
+    private String getExecLocalPath(TaskInstance taskInstance){
+        return 
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+                taskInstance.getProcessDefine().getId(),
+                taskInstance.getProcessInstance().getId(),
+                taskInstance.getId());
+    }
+
+
+    /**
      *  whehter tenant is null
      * @param tenant tenant
      * @param taskInstance taskInstance
@@ -187,19 +204,6 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
         }
         return false;
     }
-
-    /**
-     * get execute local path
-     *
-     * @return execute local path
-     */
-    private String getExecLocalPath(TaskInstance taskInstance){
-        return 
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
-                taskInstance.getProcessDefine().getId(),
-                taskInstance.getProcessInstance().getId(),
-                taskInstance.getId());
-    }
-
     /**
      * submit master base task exec thread
      * @return TaskInstance
@@ -210,7 +214,7 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
 
         int retryTimes = 1;
         boolean submitDB = false;
-        boolean submitQueue = false;
+        boolean submitTask = false;
         TaskInstance task = null;
         while (retryTimes <= commitRetryTimes){
             try {
@@ -221,27 +225,60 @@ public class MasterBaseTaskExecThread implements 
Callable<Boolean> {
                         submitDB = true;
                     }
                 }
-                if(submitDB && !submitQueue){
-                    // submit task to queue
-                    submitQueue = dispatch(task);
+                if(submitDB && !submitTask){
+                    // dispatcht task
+                    submitTask = dispatchtTask(task);
                 }
-                if(submitDB && submitQueue){
+                if(submitDB && submitTask){
                     return task;
                 }
                 if(!submitDB){
                     logger.error("task commit to db failed , taskId {} has 
already retry {} times, please check the database", taskInstance.getId(), 
retryTimes);
-                }else if(!submitQueue){
-                    logger.error("task commit to queue failed , taskId {} has 
already retry {} times, please check the queue", taskInstance.getId(), 
retryTimes);
+                }else if(!submitTask){
+                    logger.error("task commit  failed , taskId {} has already 
retry {} times, please check", taskInstance.getId(), retryTimes);
                 }
                 Thread.sleep(commitRetryInterval);
             } catch (Exception e) {
-                logger.error("task commit to mysql and queue failed",e);
+                logger.error("task commit to mysql and dispatcht task 
failed",e);
             }
             retryTimes += 1;
         }
         return task;
     }
 
+
+
+    /**
+     * dispatcht task
+     * @param taskInstance taskInstance
+     * @return whether submit task success
+     */
+    public Boolean dispatchtTask(TaskInstance taskInstance) {
+
+        try{
+            if(taskInstance.isSubProcess()){
+                return true;
+            }
+            if(taskInstance.getState().typeIsFinished()){
+                logger.info(String.format("submit task , but task [%s] state 
[%s] is already  finished. ", taskInstance.getName(), 
taskInstance.getState().toString()));
+                return true;
+            }
+            // task cannot submit when running
+            if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
+                logger.info(String.format("submit to task, but task [%s] state 
already be running. ", taskInstance.getName()));
+                return true;
+            }
+            logger.info("task ready to submit: {}" , taskInstance);
+            boolean submitTask = dispatch(taskInstance);
+            logger.info(String.format("master submit success, task : %s", 
taskInstance.getName()) );
+            return submitTask;
+        }catch (Exception e){
+            logger.error("submit task  Exception: ", e);
+            logger.error("task error : %s", JSONUtils.toJson(taskInstance));
+            return false;
+        }
+    }
+
     /**
      * submit wait complete
      * @return true
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index d0f4927..576dc76 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -404,7 +404,7 @@ public class MasterExecThread implements Runnable {
     }
 
     /**
-     * submit task to execute
+     * TODO submit task to execute
      * @param taskInstance task instance
      * @return TaskInstance
      */
@@ -873,7 +873,7 @@ public class MasterExecThread implements Runnable {
                 }
                 logger.info("task :{}, id:{} complete, state is {} ",
                         task.getName(), task.getId(), 
task.getState().toString());
-                // node success , post node submit
+                //TODO  node success , post node submit
                 if(task.getState() == ExecutionStatus.SUCCESS){
                     completeTaskList.put(task.getName(), task);
                     submitPostNode(task.getName());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 66d1a3f..deac503 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,6 +26,9 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import com.alibaba.fastjson.JSONObject;
+import 
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
+import 
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +46,12 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
      */
     private static final Logger logger = 
LoggerFactory.getLogger(MasterTaskExecThread.class);
 
+
+    /**
+     * taskInstance state manager
+     */
+    private TaskInstanceCacheManager taskInstanceCacheManager;
+
     /**
      * constructor of MasterTaskExecThread
      * @param taskInstance      task instance
@@ -50,6 +59,7 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
      */
     public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance 
processInstance){
         super(taskInstance, processInstance);
+        this.taskInstanceCacheManager = 
SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
     }
 
     /**
@@ -67,7 +77,7 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
     private Boolean alreadyKilled = false;
 
     /**
-     * submit task instance and wait complete
+     * TODO submit task instance and wait complete
      * @return true is task quit is true
      */
     @Override
@@ -89,12 +99,16 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
     }
 
     /**
-     * wait task quit
+     * TODO 在这里轮询数据库
+     * TODO wait task quit
      * @return true if task quit success
      */
     public Boolean waitTaskQuit(){
         // query new state
-        taskInstance = 
processService.findTaskInstanceById(taskInstance.getId());
+        taskInstance = 
taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
+        if (taskInstance == null){
+            taskInstance = 
processService.findTaskInstanceById(taskInstance.getId());
+        }
         logger.info("wait task: process id: {}, task id:{}, task name:{} 
complete",
                 this.taskInstance.getProcessInstanceId(), 
this.taskInstance.getId(), this.taskInstance.getName());
         // task time out
@@ -119,6 +133,8 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
                 }
                 // task instance finished
                 if (taskInstance.getState().typeIsFinished()){
+                    // if task is final result , then remove taskInstance from 
cache
+                    
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
                     break;
                 }
                 if(checkTimeout){
@@ -133,7 +149,7 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
                     }
                 }
                 // updateProcessInstance task instance
-                taskInstance = 
processService.findTaskInstanceById(taskInstance.getId());
+                taskInstance = 
taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
                 processInstance = 
processService.findProcessInstanceById(processInstance.getId());
                 Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
@@ -149,6 +165,7 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
 
 
     /**
+     *  TODO Kill 任务
      *  task instance add queue , waiting worker to kill
      */
     private void cancelTaskInstance(){
@@ -162,6 +179,7 @@ public class MasterTaskExecThread extends 
MasterBaseTaskExecThread {
         }
         String queueValue = String.format("%s-%d",
                 host, taskInstance.getId());
+        // TODO 这里写
         taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue);
 
         logger.info("master add kill task :{} id:{} to kill queue",
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 818e223..37dcc2c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -155,6 +155,7 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
         }else{
             ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
         }
+        taskExecutionContext.setLogPath(ackCommand.getLogPath());
         return ackCommand;
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 2985374..1f5aa3c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -42,6 +42,12 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(TaskKillProcessor.class);
 
+    /**
+     *  task kill process
+     *
+     * @param channel channel
+     * @param command command
+     */
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
@@ -51,6 +57,11 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
     }
 
 
+    /**
+     * kill task logic
+     *
+     * @param killCommand killCommand
+     */
     private void doKill(KillTaskRequestCommand killCommand){
         try {
             if(killCommand.getProcessId() == 0 ){
@@ -71,6 +82,14 @@ public class TaskKillProcessor implements 
NettyRequestProcessor {
         }
     }
 
+    /**
+     *  kill yarn job
+     *
+     * @param host host
+     * @param logPath logPath
+     * @param executePath executePath
+     * @param tenantCode tenantCode
+     */
     public void killYarnJob(String host, String logPath, String executePath, 
String tenantCode) {
         try {
             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index be89401..c0dd25b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -103,7 +103,6 @@ public class TaskExecuteThread implements Runnable {
 
             // set task props
             TaskProps taskProps = new TaskProps(taskNode.getParams(),
-                    taskExecutionContext.getExecutePath(),
                     taskExecutionContext.getScheduleTime(),
                     taskExecutionContext.getTaskName(),
                     taskExecutionContext.getTaskType(),
@@ -114,7 +113,10 @@ public class TaskExecuteThread implements Runnable {
                     taskExecutionContext.getStartTime(),
                     getGlobalParamsMap(),
                     null,
-                    
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()));
+                    
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
+                    OSUtils.getHost(),
+                    taskExecutionContext.getLogPath(),
+                    taskExecutionContext.getExecutePath());
             // set task timeout
             setTaskTimeout(taskProps, taskNode);
 
@@ -142,7 +144,7 @@ public class TaskExecuteThread implements Runnable {
             // task result process
             task.after();
 
-            //
+
             responseCommand.setStatus(task.getExitStatus().getCode());
             responseCommand.setEndTime(new Date());
             logger.info("task instance id : {},task final status : {}", 
taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index c473f3a..c2e9e28 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -18,13 +18,17 @@ package org.apache.dolphinscheduler.server.worker.task;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+
 import org.slf4j.Logger;
 
 import java.io.*;
@@ -37,6 +41,8 @@ import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.dolphinscheduler.common.Constants.*;
+
 /**
  * abstract command executor
  */
@@ -69,7 +75,7 @@ public abstract class AbstractCommandExecutor {
     /**
      *  task appId
      */
-    protected final int taskInstId;
+    protected final int taskInstanceId;
 
     /**
      *  tenant code , execute task linux user
@@ -101,93 +107,37 @@ public abstract class AbstractCommandExecutor {
      */
     protected final List<String> logBuffer;
 
+    /**
+     *  log path
+     */
+    private String logPath;
+
+    /**
+     *  execute path
+     */
+    private String executePath;
 
     public AbstractCommandExecutor(Consumer<List<String>> logHandler,
-                                   String taskDir, String taskAppId,int 
taskInstId,String tenantCode, String envFile,
-                                   Date startTime, int timeout, Logger logger){
+                                   String taskDir,
+                                   String taskAppId,
+                                   Integer taskInstanceId,
+                                   String tenantCode, String envFile,
+                                   Date startTime, int timeout, String 
logPath,String executePath,Logger logger){
         this.logHandler = logHandler;
         this.taskDir = taskDir;
         this.taskAppId = taskAppId;
-        this.taskInstId = taskInstId;
+        this.taskInstanceId = taskInstanceId;
         this.tenantCode = tenantCode;
         this.envFile = envFile;
         this.startTime = startTime;
         this.timeout = timeout;
+        this.logPath = logPath;
+        this.executePath = executePath;
         this.logger = logger;
         this.logBuffer = Collections.synchronizedList(new ArrayList<>());
     }
 
     /**
-     * task specific execution logic
-     *
-     * @param execCommand   exec command
-     * @param processService    process dao
-     * @return exit status code
-     */
-    public int run(String execCommand, ProcessService processService) {
-        int exitStatusCode;
-
-        try {
-            if (StringUtils.isEmpty(execCommand)) {
-                exitStatusCode = 0;
-                return exitStatusCode;
-            }
-
-            String commandFilePath = buildCommandFilePath();
-
-            // create command file if not exists
-            createCommandFileIfNotExists(execCommand, commandFilePath);
-
-            //build process
-            buildProcess(commandFilePath);
-
-            // parse process output
-            parseProcessOutput(process);
-
-            // get process id
-            int pid = getProcessId(process);
-
-            processService.updatePidByTaskInstId(taskInstId, pid, "");
-
-            logger.info("process start, process id is: {}", pid);
-
-            // if timeout occurs, exit directly
-            long remainTime = getRemaintime();
-
-            // waiting for the run to finish
-            boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
-
-            if (status) {
-                exitStatusCode = process.exitValue();
-                logger.info("process has exited, work dir:{}, pid:{} 
,exitStatusCode:{}", taskDir, pid,exitStatusCode);
-                //update process state to db
-                exitStatusCode = updateState(processService, exitStatusCode, 
pid, taskInstId);
-
-            } else {
-                TaskInstance taskInstance = 
processService.findTaskInstanceById(taskInstId);
-                if (taskInstance == null) {
-                    logger.error("task instance id:{} not exist", taskInstId);
-                } else {
-                    ProcessUtils.kill(taskInstance);
-                }
-                exitStatusCode = -1;
-                logger.warn("process timeout, work dir:{}, pid:{}", taskDir, 
pid);
-            }
-
-        } catch (InterruptedException e) {
-            exitStatusCode = -1;
-            logger.error(String.format("interrupt exception: {}, task may be 
cancelled or killed",e.getMessage()), e);
-            throw new RuntimeException("interrupt exception. exitCode is :  " 
+ exitStatusCode);
-        } catch (Exception e) {
-            exitStatusCode = -1;
-            logger.error(e.getMessage(), e);
-            throw new RuntimeException("process error . exitCode is :  " + 
exitStatusCode);
-        }
-
-        return exitStatusCode;
-    }
-
-    /**
      * build process
      *
      * @param commandFile command file
@@ -217,35 +167,80 @@ public abstract class AbstractCommandExecutor {
     }
 
     /**
-     * update process state to db
-     *
-     * @param processService        process dao
-     * @param exitStatusCode    exit status code
-     * @param pid               process id
-     * @param taskInstId        task instance id
-     * @return exit status code
+     * task specific execution logic
+     * @param execCommand execCommand
+     * @return CommandExecuteResult
+     * @throws Exception
      */
-    private int updateState(ProcessService processService, int exitStatusCode, 
int pid, int taskInstId) {
-        //get yarn state by log
-        if (exitStatusCode == 0) {
-            TaskInstance taskInstance = 
processService.findTaskInstanceById(taskInstId);
-            logger.info("process id is {}", pid);
-
-            List<String> appIds = getAppLinks(taskInstance.getLogPath());
-            if (appIds.size() > 0) {
-                String appUrl = String.join(Constants.COMMA, appIds);
-                logger.info("yarn log url:{}",appUrl);
-                processService.updatePidByTaskInstId(taskInstId, pid, appUrl);
-            }
+    public CommandExecuteResult run(String execCommand) throws Exception{
 
-            // check if all operations are completed
-            if (!isSuccessOfYarnState(appIds)) {
-                exitStatusCode = -1;
-            }
+        CommandExecuteResult result = new CommandExecuteResult();
+
+
+        if (StringUtils.isEmpty(execCommand)) {
+            return result;
+        }
+
+        String commandFilePath = buildCommandFilePath();
+
+        // create command file if not exists
+        createCommandFileIfNotExists(execCommand, commandFilePath);
+
+        //build process
+        buildProcess(commandFilePath);
+
+        // parse process output
+        parseProcessOutput(process);
+
+
+        Integer processId = getProcessId(process);
+
+        result.setProcessId(processId);
+
+        // print process id
+        logger.info("process start, process id is: {}", processId);
+
+        // if timeout occurs, exit directly
+        long remainTime = getRemaintime();
+
+        // waiting for the run to finish
+        boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
+
+        // SHELL task state
+        result.setExitStatusCode(process.exitValue());
+
+        logger.info("process has exited, execute path:{}, processId:{} 
,exitStatusCode:{}",
+                taskDir,
+                processId
+                , result.getExitStatusCode());
+
+        // if SHELL task exit
+        if (status) {
+            // set appIds
+            List<String> appIds = getAppIds(logPath);
+            result.setAppIds(String.join(Constants.COMMA, appIds));
+
+            // if yarn task , yarn state is final state
+            result.setExitStatusCode(isSuccessOfYarnState(appIds) ? 
EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
+        } else {
+            logger.error("process has failure , exitStatusCode : {} , ready to 
kill ...", result.getExitStatusCode());
+            TaskInstance taskInstance = new TaskInstance();
+            taskInstance.setPid(processId);
+            taskInstance.setHost(OSUtils.getHost());
+            taskInstance.setLogPath(logPath);
+            taskInstance.setExecutePath(executePath);
+
+            ProcessInstance processInstance = new ProcessInstance();
+            processInstance.setTenantCode(tenantCode);
+            taskInstance.setProcessInstance(processInstance);
+
+            ProcessUtils.kill(taskInstance);
+            result.setExitStatusCode(EXIT_CODE_FAILURE);
         }
-        return exitStatusCode;
+        return result;
     }
 
+
     /**
      * cancel application
      * @throws Exception exception
@@ -378,10 +373,6 @@ public abstract class AbstractCommandExecutor {
         parseProcessOutputExecutorService.shutdown();
     }
 
-    public int getPid() {
-        return getProcessId(process);
-    }
-
     /**
      * check yarn state
      *
@@ -389,11 +380,10 @@ public abstract class AbstractCommandExecutor {
      * @return is success of yarn task state
      */
     public boolean isSuccessOfYarnState(List<String> appIds) {
-
         boolean result = true;
         try {
             for (String appId : appIds) {
-                while(true){
+                while(Stopper.isRunning()){
                     ExecutionStatus applicationStatus = 
HadoopUtils.getInstance().getApplicationStatus(appId);
                     logger.info("appId:{}, final 
state:{}",appId,applicationStatus.name());
                     if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
@@ -406,7 +396,7 @@ public abstract class AbstractCommandExecutor {
                     }
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                 }
-           }
+            }
         } catch (Exception e) {
             logger.error(String.format("yarn applications: %s  status failed 
", appIds.toString()),e);
             result = false;
@@ -415,15 +405,20 @@ public abstract class AbstractCommandExecutor {
 
     }
 
+    public int getProcessId() {
+        return getProcessId(process);
+    }
+
     /**
      * get app links
-     * @param fileName file name
+     *
+     * @param logPath log path
      * @return app id list
      */
-    private List<String> getAppLinks(String fileName) {
-        List<String> logs = convertFile2List(fileName);
+    private List<String> getAppIds(String logPath) {
+        List<String> logs = convertFile2List(logPath);
 
-        List<String> appIds = new ArrayList<String>();
+        List<String> appIds = new ArrayList<>();
         /**
          * analysis log,get submited yarn application id
          */
@@ -565,6 +560,5 @@ public abstract class AbstractCommandExecutor {
     }
     protected abstract String buildCommandFilePath();
     protected abstract String commandInterpreter();
-    protected abstract boolean checkFindApp(String line);
     protected abstract void createCommandFileIfNotExists(String execCommand, 
String commandFile) throws IOException;
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index f2772d0..e7c4e69 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -56,6 +56,17 @@ public abstract class AbstractTask {
 
 
     /**
+     *  SHELL process pid
+     */
+    protected Integer processId;
+
+    /**
+     * other resource manager appId , for example : YARN etc
+     */
+    protected String appIds;
+
+
+    /**
      * cancel
      */
     protected volatile boolean cancel = false;
@@ -119,6 +130,22 @@ public abstract class AbstractTask {
         this.exitStatusCode = exitStatusCode;
     }
 
+    public String getAppIds() {
+        return appIds;
+    }
+
+    public void setAppIds(String appIds) {
+        this.appIds = appIds;
+    }
+
+    public Integer getProcessId() {
+        return processId;
+    }
+
+    public void setProcessId(Integer processId) {
+        this.processId = processId;
+    }
+
     /**
      * get task parameters
      * @return AbstractParameters
@@ -126,6 +153,7 @@ public abstract class AbstractTask {
     public abstract AbstractParameters getParameters();
 
 
+
     /**
      * result processing
      */
@@ -146,7 +174,7 @@ public abstract class AbstractTask {
                         && paramsMap.containsKey("v_proc_date")){
                     String vProcDate = paramsMap.get("v_proc_date").getValue();
                     if (!StringUtils.isEmpty(vProcDate)){
-                        TaskRecordStatus taskRecordState = 
TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate);
+                        TaskRecordStatus taskRecordState = 
TaskRecordDao.getTaskRecordState(taskProps.getTaskName(), vProcDate);
                         logger.info("task record status : {}",taskRecordState);
                         if (taskRecordState == TaskRecordStatus.FAILURE){
                             setExitStatusCode(Constants.EXIT_CODE_FAILURE);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
index 39f4dfb..7a6aca9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
@@ -26,11 +26,6 @@ import org.slf4j.Logger;
  *  abstract yarn task
  */
 public abstract class AbstractYarnTask extends AbstractTask {
-
-  /**
-   *  process instance
-   */
-
   /**
    *  process task
    */
@@ -50,21 +45,26 @@ public abstract class AbstractYarnTask extends AbstractTask 
{
     super(taskProps, logger);
     this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
     this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-            taskProps.getTaskDir(),
+            taskProps.getExecutePath(),
             taskProps.getTaskAppId(),
-            taskProps.getTaskInstId(),
+            taskProps.getTaskInstanceId(),
             taskProps.getTenantCode(),
             taskProps.getEnvFile(),
             taskProps.getTaskStartTime(),
             taskProps.getTaskTimeout(),
+            taskProps.getLogPath(),
+            taskProps.getExecutePath(),
             logger);
   }
 
   @Override
   public void handle() throws Exception {
     try {
-      // construct process
-      exitStatusCode = shellCommandExecutor.run(buildCommand(), 
processService);
+      // SHELL task exit code
+      CommandExecuteResult commandExecuteResult = 
shellCommandExecutor.run(buildCommand());
+      setExitStatusCode(commandExecuteResult.getExitStatusCode());
+      setAppIds(commandExecuteResult.getAppIds());
+      setProcessId(commandExecuteResult.getProcessId());
     } catch (Exception e) {
       logger.error("yarn process failure", e);
       exitStatusCode = -1;
@@ -82,7 +82,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
     cancel = true;
     // cancel process
     shellCommandExecutor.cancelApplication();
-    TaskInstance taskInstance = 
processService.findTaskInstanceById(taskProps.getTaskInstId());
+    TaskInstance taskInstance = 
processService.findTaskInstanceById(taskProps.getTaskInstanceId());
     if (status && taskInstance != null){
       ProcessUtils.killYarnJob(taskInstance);
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java
new file mode 100644
index 0000000..5d1afe5
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+/**
+ * command execute result
+ */
+public class CommandExecuteResult {
+
+    /**
+     * command exit code
+     */
+    private Integer exitStatusCode;
+
+    /**
+     *  appIds
+     */
+    private String appIds;
+
+    /**
+     * process id
+     */
+    private Integer processId;
+
+
+    public CommandExecuteResult(){
+        this.exitStatusCode = 0;
+    }
+
+
+    public Integer getExitStatusCode() {
+        return exitStatusCode;
+    }
+
+    public void setExitStatusCode(Integer exitStatusCode) {
+        this.exitStatusCode = exitStatusCode;
+    }
+
+    public String getAppIds() {
+        return appIds;
+    }
+
+    public void setAppIds(String appIds) {
+        this.appIds = appIds;
+    }
+
+    public Integer getProcessId() {
+        return processId;
+    }
+
+    public void setProcessId(Integer processId) {
+        this.processId = processId;
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
index a673134..544a355 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
@@ -67,8 +67,10 @@ public class PythonCommandExecutor extends 
AbstractCommandExecutor {
                                  String envFile,
                                  Date startTime,
                                  int timeout,
+                                 String logPath,
+                                 String executePath,
                                  Logger logger) {
-        super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, 
startTime, timeout, logger);
+        super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, 
startTime, timeout,logPath,executePath,logger);
     }
 
 
@@ -132,15 +134,6 @@ public class PythonCommandExecutor extends 
AbstractCommandExecutor {
         return pythonHome;
     }
 
-    /**
-     * check find yarn application id
-     * @param line line
-     * @return boolean
-     */
-    @Override
-    protected boolean checkFindApp(String line) {
-        return true;
-    }
 
 
     /**
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index db46d0d..7985253 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -53,13 +53,15 @@ public class ShellCommandExecutor extends 
AbstractCommandExecutor {
     public ShellCommandExecutor(Consumer<List<String>> logHandler,
                                 String taskDir,
                                 String taskAppId,
-                                int taskInstId,
+                                Integer taskInstId,
                                 String tenantCode,
                                 String envFile,
                                 Date startTime,
-                                int timeout,
+                                Integer timeout,
+                                String logPath,
+                                String executePath,
                                 Logger logger) {
-        super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, 
startTime, timeout, logger);
+        super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, 
startTime, timeout,logPath,executePath,logger);
     }
 
 
@@ -78,15 +80,6 @@ public class ShellCommandExecutor extends 
AbstractCommandExecutor {
         return SH;
     }
 
-    /**
-     * check find yarn application id
-     * @param line line
-     * @return true if line contains task app id
-     */
-    @Override
-    protected boolean checkFindApp(String line) {
-        return line.contains(taskAppId);
-    }
 
     /**
      * create command file if not exists
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
index 8e5644e..a7b66bb 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
@@ -35,12 +35,12 @@ public class TaskProps {
   /**
    * task node name
    **/
-  private String nodeName;
+  private String taskName;
 
   /**
    * task instance id
    **/
-  private int taskInstId;
+  private int taskInstanceId;
 
   /**
    * tenant code , execute task linux user
@@ -58,11 +58,6 @@ public class TaskProps {
   private String taskParams;
 
   /**
-   * task dir
-   **/
-  private String taskDir;
-
-  /**
    * queue
    **/
   private String queue;
@@ -111,6 +106,22 @@ public class TaskProps {
    */
   private CommandType cmdTypeIfComplement;
 
+
+  /**
+   *  host
+   */
+  private String host;
+
+  /**
+   *  log path
+   */
+  private String logPath;
+
+  /**
+   * execute path
+   */
+  private String executePath;
+
   /**
    * constructor
    */
@@ -123,7 +134,7 @@ public class TaskProps {
    * @param scheduleTime        schedule time
    * @param nodeName            node name
    * @param taskType            task type
-   * @param taskInstId          task instance id
+   * @param taskInstanceId          task instance id
    * @param envFile             env file
    * @param tenantCode          tenant code
    * @param queue               queue
@@ -133,24 +144,25 @@ public class TaskProps {
    * @param cmdTypeIfComplement cmd type if complement
    */
   public TaskProps(String taskParams,
-                   String taskDir,
                    Date scheduleTime,
                    String nodeName,
                    String taskType,
-                   int taskInstId,
+                   int taskInstanceId,
                    String envFile,
                    String tenantCode,
                    String queue,
                    Date taskStartTime,
                    Map<String, String> definedParams,
                    String dependence,
-                   CommandType cmdTypeIfComplement){
+                   CommandType cmdTypeIfComplement,
+                   String host,
+                   String logPath,
+                   String executePath){
     this.taskParams = taskParams;
-    this.taskDir = taskDir;
     this.scheduleTime = scheduleTime;
-    this.nodeName = nodeName;
+    this.taskName = nodeName;
     this.taskType = taskType;
-    this.taskInstId = taskInstId;
+    this.taskInstanceId = taskInstanceId;
     this.envFile = envFile;
     this.tenantCode = tenantCode;
     this.queue = queue;
@@ -158,7 +170,9 @@ public class TaskProps {
     this.definedParams = definedParams;
     this.dependence = dependence;
     this.cmdTypeIfComplement = cmdTypeIfComplement;
-
+    this.host = host;
+    this.logPath = logPath;
+    this.executePath = executePath;
   }
 
   public String getTenantCode() {
@@ -177,12 +191,12 @@ public class TaskProps {
     this.taskParams = taskParams;
   }
 
-  public String getTaskDir() {
-    return taskDir;
+  public String getExecutePath() {
+    return executePath;
   }
 
-  public void setTaskDir(String taskDir) {
-    this.taskDir = taskDir;
+  public void setExecutePath(String executePath) {
+    this.executePath = executePath;
   }
 
   public Map<String, String> getDefinedParams() {
@@ -202,20 +216,20 @@ public class TaskProps {
   }
 
 
-  public String getNodeName() {
-    return nodeName;
+  public String getTaskName() {
+    return taskName;
   }
 
-  public void setNodeName(String nodeName) {
-    this.nodeName = nodeName;
+  public void setTaskName(String taskName) {
+    this.taskName = taskName;
   }
 
-  public int getTaskInstId() {
-    return taskInstId;
+  public int getTaskInstanceId() {
+    return taskInstanceId;
   }
 
-  public void setTaskInstId(int taskInstId) {
-    this.taskInstId = taskInstId;
+  public void setTaskInstanceId(int taskInstanceId) {
+    this.taskInstanceId = taskInstanceId;
   }
 
   public String getQueue() {
@@ -291,6 +305,22 @@ public class TaskProps {
     this.cmdTypeIfComplement = cmdTypeIfComplement;
   }
 
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public String getLogPath() {
+    return logPath;
+  }
+
+  public void setLogPath(String logPath) {
+    this.logPath = logPath;
+  }
+
   /**
    * get parameters map
    * @return user defined params map
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index ef941cd..7c867f1 100755
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -52,6 +52,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.server.utils.DataxUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
 import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -121,12 +122,12 @@ public class DataxTask extends AbstractTask {
     public DataxTask(TaskProps props, Logger logger) {
         super(props, logger);
 
-        this.taskDir = props.getTaskDir();
+        this.taskDir = props.getExecutePath();
         logger.info("task dir : {}", taskDir);
 
-        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
props.getTaskDir(), props.getTaskAppId(),
-            props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), 
props.getTaskStartTime(),
-            props.getTaskTimeout(), logger);
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
props.getExecutePath(), props.getTaskAppId(),
+            props.getTaskInstanceId(), props.getTenantCode(), 
props.getEnvFile(), props.getTaskStartTime(),
+            props.getTaskTimeout(), 
props.getLogPath(),props.getExecutePath(),logger);
 
         this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
     }
@@ -160,10 +161,15 @@ public class DataxTask extends AbstractTask {
             // run datax process
             String jsonFilePath = buildDataxJsonFile();
             String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
-            exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, 
processService);
+            CommandExecuteResult commandExecuteResult = 
shellCommandExecutor.run(shellCommandFilePath);
+
+            setExitStatusCode(commandExecuteResult.getExitStatusCode());
+            setAppIds(commandExecuteResult.getAppIds());
+            setProcessId(commandExecuteResult.getProcessId());
         }
         catch (Exception e) {
-            exitStatusCode = -1;
+            logger.error("datax task failure", e);
+            setExitStatusCode(Constants.EXIT_CODE_FAILURE);
             throw e;
         }
     }
@@ -355,7 +361,7 @@ public class DataxTask extends AbstractTask {
         String dataxCommand = sbr.toString();
 
         // find process instance by task id
-        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
 
         // combining local and global parameters
         Map<String, Property> paramsMap = 
ParamUtils.convert(taskProps.getUserDefParamsMap(),
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
index f074d57..22cd60e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
@@ -107,7 +107,7 @@ public class DependentTask extends AbstractTask {
         try{
             TaskInstance taskInstance = null;
             while(Stopper.isRunning()){
-                taskInstance = 
processService.findTaskInstanceById(this.taskProps.getTaskInstId());
+                taskInstance = 
processService.findTaskInstanceById(this.taskProps.getTaskInstanceId());
 
                 if(taskInstance == null){
                     exitStatusCode = -1;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index c562fbe..ead61f0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -68,7 +68,7 @@ public class FlinkTask extends AbstractYarnTask {
     if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
       String args = flinkParameters.getMainArgs();
       // get process instance by task instance id
-      ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+      ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
 
       /**
        *  combining local and global parameters
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index c925f90..c1d1ed8 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -138,7 +138,7 @@ public class HttpTask extends AbstractTask {
      */
     protected CloseableHttpResponse sendRequest(CloseableHttpClient client) 
throws IOException {
         RequestBuilder builder = createRequestBuilder();
-        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
 
         Map<String, Property> paramsMap = 
ParamUtils.convert(taskProps.getUserDefParamsMap(),
                 taskProps.getDefinedParams(),
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index fc212f8..cae5324 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.server.worker.task.python;
 
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.python.PythonParameters;
@@ -24,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
 import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -65,16 +67,18 @@ public class PythonTask extends AbstractTask {
   public PythonTask(TaskProps taskProps, Logger logger) {
     super(taskProps, logger);
 
-    this.taskDir = taskProps.getTaskDir();
+    this.taskDir = taskProps.getExecutePath();
 
     this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
-            taskProps.getTaskDir(),
+            taskProps.getExecutePath(),
             taskProps.getTaskAppId(),
-            taskProps.getTaskInstId(),
+            taskProps.getTaskInstanceId(),
             taskProps.getTenantCode(),
             taskProps.getEnvFile(),
             taskProps.getTaskStartTime(),
             taskProps.getTaskTimeout(),
+            taskProps.getLogPath(),
+            taskProps.getExecutePath(),
             logger);
     this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
   }
@@ -94,10 +98,15 @@ public class PythonTask extends AbstractTask {
   public void handle() throws Exception {
     try {
       //  construct process
-      exitStatusCode = pythonCommandExecutor.run(buildCommand(), 
processService);
-    } catch (Exception e) {
+      CommandExecuteResult commandExecuteResult = 
pythonCommandExecutor.run(buildCommand());
+
+      setExitStatusCode(commandExecuteResult.getExitStatusCode());
+      setAppIds(commandExecuteResult.getAppIds());
+      setProcessId(commandExecuteResult.getProcessId());
+    }
+    catch (Exception e) {
       logger.error("python task failure", e);
-      exitStatusCode = -1;
+      setExitStatusCode(Constants.EXIT_CODE_FAILURE);
       throw e;
     }
   }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 5704c80..b703a44 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
 import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -74,15 +75,17 @@ public class ShellTask extends AbstractTask {
   public ShellTask(TaskProps taskProps, Logger logger) {
     super(taskProps, logger);
 
-    this.taskDir = taskProps.getTaskDir();
+    this.taskDir = taskProps.getExecutePath();
 
-    this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskProps.getTaskDir(),
+    this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, 
taskProps.getExecutePath(),
             taskProps.getTaskAppId(),
-            taskProps.getTaskInstId(),
+            taskProps.getTaskInstanceId(),
             taskProps.getTenantCode(),
             taskProps.getEnvFile(),
             taskProps.getTaskStartTime(),
             taskProps.getTaskTimeout(),
+            taskProps.getLogPath(),
+            taskProps.getExecutePath(),
             logger);
     this.processService = 
SpringApplicationContext.getBean(ProcessService.class);
   }
@@ -102,10 +105,13 @@ public class ShellTask extends AbstractTask {
   public void handle() throws Exception {
     try {
       // construct process
-      exitStatusCode = shellCommandExecutor.run(buildCommand(), 
processService);
+      CommandExecuteResult commandExecuteResult = 
shellCommandExecutor.run(buildCommand());
+      setExitStatusCode(commandExecuteResult.getExitStatusCode());
+      setAppIds(commandExecuteResult.getAppIds());
+      setProcessId(commandExecuteResult.getProcessId());
     } catch (Exception e) {
       logger.error("shell task failure", e);
-      exitStatusCode = -1;
+      setExitStatusCode(Constants.EXIT_CODE_FAILURE);
       throw e;
     }
   }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index aae11f5..e3a4cf7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -316,7 +316,7 @@ public class SqlTask extends AbstractTask {
                             sendAttachment(sqlParameters.getTitle(),
                                     JSONObject.toJSONString(resultJSONArray, 
SerializerFeature.WriteMapNullValue));
                         }else{
-                            sendAttachment(taskProps.getNodeName() + " query 
resultsets ",
+                            sendAttachment(taskProps.getTaskName() + " query 
resultsets ",
                                     JSONObject.toJSONString(resultJSONArray, 
SerializerFeature.WriteMapNullValue));
                         }
                     }
@@ -384,7 +384,7 @@ public class SqlTask extends AbstractTask {
     public void sendAttachment(String title,String content){
 
         //  process instance
-        ProcessInstance instance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+        ProcessInstance instance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
 
         List<User> users = 
alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
 
@@ -471,7 +471,7 @@ public class SqlTask extends AbstractTask {
      */
     private void checkUdfPermission(Integer[] udfFunIds) throws Exception{
         //  process instance
-        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
         int userId = processInstance.getExecutorId();
 
         PermissionCheck<Integer> permissionCheckUdf = new 
PermissionCheck<Integer>(AuthorizationType.UDF, 
processService,udfFunIds,userId,logger);
@@ -485,7 +485,7 @@ public class SqlTask extends AbstractTask {
      */
     private void checkDataSourcePermission(int dataSourceId) throws Exception{
         //  process instance
-        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+        ProcessInstance processInstance = 
processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
         int userId = processInstance.getExecutorId();
 
         PermissionCheck<Integer> permissionCheckDataSource = new 
PermissionCheck<Integer>(AuthorizationType.DATASOURCE, processService,new 
Integer[]{dataSourceId},userId,logger);
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
index 5d42636..1a8a4ff 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
@@ -55,13 +55,13 @@ public class ShellCommandExecutorTest {
 
         TaskProps taskProps = new TaskProps();
         // processDefineId_processInstanceId_taskInstanceId
-        
taskProps.setTaskDir("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657");
+        
taskProps.setExecutePath("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657");
         taskProps.setTaskAppId("36_2864_7657");
         // set tenant -> task execute linux user
         taskProps.setTenantCode("hdfs");
         taskProps.setTaskStartTime(new Date());
         taskProps.setTaskTimeout(360000);
-        taskProps.setTaskInstId(7657);
+        taskProps.setTaskInstanceId(7657);
 
 
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
index c395eab..32e0b2f 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
@@ -97,15 +97,15 @@ public class SqlExecutorTest {
      */
     private void sharedTestSqlTask(String nodeName, String taskAppId, String 
tenantCode, int taskInstId) throws Exception {
         TaskProps taskProps = new TaskProps();
-        taskProps.setTaskDir("");
+        taskProps.setExecutePath("");
         // processDefineId_processInstanceId_taskInstanceId
         taskProps.setTaskAppId(taskAppId);
         // set tenant -> task execute linux user
         taskProps.setTenantCode(tenantCode);
         taskProps.setTaskStartTime(new Date());
         taskProps.setTaskTimeout(360000);
-        taskProps.setTaskInstId(taskInstId);
-        taskProps.setNodeName(nodeName);
+        taskProps.setTaskInstanceId(taskInstId);
+        taskProps.setTaskName(nodeName);
         taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS);
 
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
index bd7f275..d1b82f2 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
@@ -71,9 +71,9 @@ public class DataxTaskTest {
         
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
         TaskProps props = new TaskProps();
-        props.setTaskDir("/tmp");
+        props.setExecutePath("/tmp");
         props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
-        props.setTaskInstId(1);
+        props.setTaskInstanceId(1);
         props.setTenantCode("1");
         props.setEnvFile(".dolphinscheduler_env.sh");
         props.setTaskStartTime(new Date());
@@ -87,8 +87,8 @@ public class DataxTaskTest {
         
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
         
Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
 
-        String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), 
props.getTaskAppId());
-        Mockito.when(shellCommandExecutor.run(fileName, 
processService)).thenReturn(0);
+        String fileName = String.format("%s/%s_node.sh", 
props.getExecutePath(), props.getTaskAppId());
+        Mockito.when(shellCommandExecutor.run(fileName)).thenReturn(null);
     }
 
     private DataSource getDataSource() {
@@ -118,9 +118,9 @@ public class DataxTaskTest {
     public void testDataxTask()
             throws Exception {
         TaskProps props = new TaskProps();
-        props.setTaskDir("/tmp");
+        props.setExecutePath("/tmp");
         props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
-        props.setTaskInstId(1);
+        props.setTaskInstanceId(1);
         props.setTenantCode("1");
         Assert.assertNotNull(new DataxTask(props, logger));
     }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
index 272fb54..a6a2587 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
@@ -50,7 +50,7 @@ public class DependentTaskTest {
                 "\"relation\":\"OR\"\n" +
                 "}";
 
-        taskProps.setTaskInstId(252612);
+        taskProps.setTaskInstanceId(252612);
         taskProps.setDependence(dependString);
         DependentTask dependentTask = new DependentTask(taskProps, logger);
         dependentTask.init();
diff --git 
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue 
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
index dbe3e1d..7874b53 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
@@ -170,7 +170,7 @@
        */
       _downloadLog () {
         downloadFile('/dolphinscheduler/log/download-log', {
-          taskInstId: this.stateId || this.logId
+          taskInstanceId: this.stateId || this.logId
         })
       },
       /**
@@ -256,7 +256,7 @@
     computed: {
       _rtParam () {
         return {
-          taskInstId: this.stateId || this.logId,
+          taskInstanceId: this.stateId || this.logId,
           skipLineNum: parseInt(`${this.loadingIndex ? this.loadingIndex + 
'000' : 0}`),
           limit: parseInt(`${this.loadingIndex ? this.loadingIndex + 1 : 
1}000`)
         }

Reply via email to