This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7bf3e3cdd6 [improve-#13201] update pid during running (#13206)
7bf3e3cdd6 is described below

commit 7bf3e3cdd66a68ac8d155b08fbd70365aa3a306b
Author: fuchanghai <[email protected]>
AuthorDate: Wed Feb 22 14:22:49 2023 +0800

    [improve-#13201] update pid during running (#13206)
---
 .../common/enums/TaskEventType.java                |   1 +
 .../master/event/TaskUpdatePidEventHandler.java    | 103 +++++++++++++++++++++
 .../master/processor/TaskUpdatePidProcessor.java   |  66 +++++++++++++
 .../server/master/processor/queue/TaskEvent.java   |  13 +++
 .../server/master/rpc/MasterRPCServer.java         |   5 +
 .../remote/command/CommandType.java                |  10 ++
 .../remote/command/TaskUpdatePidAckMessage.java    |  55 +++++++++++
 .../remote/command/TaskUpdatePidCommand.java       |  83 +++++++++++++++++
 .../plugin/task/api/AbstractCommandExecutor.java   |   7 +-
 .../plugin/task/api/AbstractYarnTask.java          |   2 +-
 .../plugin/task/api/TaskCallBack.java              |   2 +
 .../plugin/task/chunjun/ChunJunTask.java           |   2 +-
 .../plugin/task/datax/DataxTask.java               |   2 +-
 .../plugin/task/datax/DataxTaskTest.java           |  23 ++++-
 .../dolphinscheduler/plugin/task/dvc/DvcTask.java  |   2 +-
 .../plugin/task/emr/EmrAddStepsTaskTest.java       |  12 ++-
 .../plugin/task/emr/EmrJobFlowTaskTest.java        |  12 ++-
 .../plugin/task/hivecli/HiveCliTask.java           |   2 +-
 .../plugin/task/java/JavaTask.java                 |   2 +-
 .../plugin/task/java/JavaTaskTest.java             |  12 ++-
 .../plugin/task/jupyter/JupyterTask.java           |   2 +-
 .../plugin/task/linkis/LinkisTask.java             |   6 +-
 .../plugin/task/mlflow/MlflowTask.java             |   2 +-
 .../plugin/task/python/PythonTask.java             |   2 +-
 .../plugin/task/pytorch/PytorchTask.java           |   2 +-
 .../plugin/task/seatunnel/SeatunnelTask.java       |   2 +-
 .../plugin/task/shell/ShellTask.java               |   2 +-
 .../plugin/task/zeppelin/ZeppelinTaskTest.java     |  12 ++-
 .../worker/message/TaskUpdatePidMessageSender.java |  65 +++++++++++++
 .../processor/TaskUpdatePidAckProcessor.java       |  71 ++++++++++++++
 .../server/worker/rpc/WorkerRpcClient.java         |   8 ++
 .../server/worker/rpc/WorkerRpcServer.java         |   8 ++
 .../server/worker/runner/TaskCallbackImpl.java     |  14 +++
 33 files changed, 587 insertions(+), 25 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
index f24f168679..371694a051 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
@@ -22,6 +22,7 @@ public enum TaskEventType {
     DELAY,
     RUNNING,
     RESULT,
+    UPDATE_PID,
     WORKER_REJECT,
     CACHE,
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
new file mode 100644
index 0000000000..1f81139ae2
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage;
+import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskUpdatePidEventHandler implements TaskEventHandler {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Override
+    public void handleTaskEvent(TaskEvent taskEvent) throws 
TaskEventHandleError {
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+                
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable == null) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                    "Handle task running event error, cannot find related 
workflow instance from cache, will discard this event");
+        }
+        Optional<TaskInstance> taskInstanceOptional = 
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+        if (!taskInstanceOptional.isPresent()) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                    "Handle running event error, cannot find the taskInstance 
from cache, will discord this event");
+        }
+        TaskInstance taskInstance = taskInstanceOptional.get();
+        if (taskInstance.getState().isFinished()) {
+            sendAckToWorker(taskEvent);
+            throw new TaskEventHandleError(
+                    "Handle task running event error, this task instance is 
already finished, this event is delay, will discard this event");
+        }
+
+        TaskInstance oldTaskInstance = new TaskInstance();
+        TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+        try {
+            taskInstance.setStartTime(taskEvent.getStartTime());
+            taskInstance.setHost(taskEvent.getWorkerAddress());
+            taskInstance.setPid(taskEvent.getProcessId());
+            if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
+                throw new TaskEventHandleError("Handle task running event 
error, update taskInstance to db failed");
+            }
+            sendAckToWorker(taskEvent);
+        } catch (Exception ex) {
+            TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+            if (ex instanceof TaskEventHandleError) {
+                throw ex;
+            }
+            throw new TaskEventHandleError("Handle task update pid  event 
error, update taskInstance to db failed", ex);
+        }
+
+    }
+
+    private void sendAckToWorker(TaskEvent taskEvent) {
+        // If event handle success, send ack to worker to otherwise the worker 
will retry this event
+        TaskUpdatePidAckMessage taskUpdatePidAckMessage =
+                new TaskUpdatePidAckMessage(true, 
taskEvent.getTaskInstanceId());
+        
taskEvent.getChannel().writeAndFlush(taskUpdatePidAckMessage.convert2Command());
+    }
+
+    @Override
+    public TaskEventType getHandleEventType() {
+        return TaskEventType.UPDATE_PID;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java
new file mode 100644
index 0000000000..bca59a079f
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
+/**
+ * task execute running processor
+ */
+@Component
+@Slf4j
+public class TaskUpdatePidProcessor implements NettyRequestProcessor {
+
+    @Autowired
+    private TaskEventService taskEventService;
+
+    /**
+     * task ack process
+     *
+     * @param channel channel channel
+     * @param command command TaskExecuteAckCommand
+     */
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_UPDATE_PID == 
command.getType(),
+                String.format("invalid command type : %s", command.getType()));
+        TaskUpdatePidCommand taskUpdatePidCommand =
+                JSONUtils.parseObject(command.getBody(), 
TaskUpdatePidCommand.class);
+        log.info("taskUpdatePidCommand: {}", taskUpdatePidCommand);
+
+        TaskEvent taskEvent = TaskEvent.newUpdatePidEvent(taskUpdatePidCommand,
+                channel,
+                taskUpdatePidCommand.getMessageSenderAddress());
+        taskEventService.addEvent(taskEvent);
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 2573707ec0..1d2827223c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -23,6 +23,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
 import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
 
 import java.util.Date;
 
@@ -161,4 +162,16 @@ public class TaskEvent {
         event.setEvent(TaskEventType.CACHE);
         return event;
     }
+
+    public static TaskEvent newUpdatePidEvent(TaskUpdatePidCommand command, 
Channel channel, String workerAddress) {
+        TaskEvent event = new TaskEvent();
+        event.setProcessInstanceId(command.getProcessInstanceId());
+        event.setTaskInstanceId(command.getTaskInstanceId());
+        event.setStartTime(DateUtils.timeStampToDate(command.getStartTime()));
+        event.setLogPath(command.getLogPath());
+        event.setChannel(channel);
+        event.setWorkerAddress(workerAddress);
+        event.setEvent(TaskEventType.UPDATE_PID);
+        return event;
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 1a070c10af..a72d73b849 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -30,6 +30,7 @@ import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningPro
 import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
+import 
org.apache.dolphinscheduler.server.master.processor.TaskUpdatePidProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
 
 import lombok.extern.slf4j.Slf4j;
@@ -67,6 +68,9 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private TaskKillResponseProcessor taskKillResponseProcessor;
 
+    @Autowired
+    private TaskUpdatePidProcessor updatePidProcessor;
+
     @Autowired
     private TaskRecallProcessor taskRecallProcessor;
 
@@ -86,6 +90,7 @@ public class MasterRPCServer implements AutoCloseable {
         serverConfig.setListenPort(masterConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, 
taskExecuteRunningProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID, 
updatePidProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, 
taskExecuteResponseProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, 
taskKillResponseProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, 
stateEventProcessor);
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 920332f4a5..0ee631ae28 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -131,6 +131,16 @@ public enum CommandType {
      */
     WORKFLOW_EXECUTING_DATA_REQUEST,
 
+    /**
+     * update taskInstance's PID request
+     */
+    TASK_UPDATE_PID,
+
+    /**
+     * update taskInstance's PID response ack, from master to worker
+     */
+    TASK_UPDATE_PID_ACK,
+
     /**
      * workflow executing data response, from master to api
      */
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java
new file mode 100644
index 0000000000..4f878c5c55
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * task execute running ack command
+ * from master to worker
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class TaskUpdatePidAckMessage implements Serializable {
+
+    private boolean success;
+    private int taskInstanceId;
+
+    /**
+     * package response command
+     *
+     * @return command
+     */
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.TASK_UPDATE_PID_ACK);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java
new file mode 100644
index 0000000000..91e5787bd5
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * Task running message, means the task is running in worker.
+ */
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskUpdatePidCommand extends BaseCommand {
+
+    /**
+     * taskInstanceId
+     */
+    private int taskInstanceId;
+
+    /**
+     * process instance id
+     */
+    private int processInstanceId;
+
+    /**
+     * startTime
+     */
+    private long startTime;
+
+    /**
+     * host
+     */
+    private String host;
+
+    /**
+     * logPath
+     */
+    private String logPath;
+
+    /**
+     * processId
+     */
+    private int processId;
+
+    public TaskUpdatePidCommand(String messageSenderAddress, String 
messageReceiverAddress, long messageSendTime) {
+        super(messageSenderAddress, messageReceiverAddress, messageSendTime);
+    }
+
+    /**
+     * package request command
+     *
+     * @return command
+     */
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.TASK_UPDATE_PID);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index e2e0ce9338..c3e9b952ad 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -186,7 +186,7 @@ public abstract class AbstractCommandExecutor {
         command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
     }
 
-    public TaskResponse run(String execCommand) throws IOException, 
InterruptedException {
+    public TaskResponse run(String execCommand, TaskCallBack taskCallBack) 
throws IOException, InterruptedException {
         TaskResponse result = new TaskResponse();
         int taskInstanceId = taskRequest.getTaskInstanceId();
         if (null == 
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
@@ -228,6 +228,11 @@ public abstract class AbstractCommandExecutor {
         // if timeout occurs, exit directly
         long remainTime = getRemainTime();
 
+        // update pid before waiting for the run to finish
+        if (null != taskCallBack) {
+            taskCallBack.updateTaskInstanceInfo(taskInstanceId);
+        }
+
         // waiting for the run to finish
         boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 68993dc9dc..85ab9a610f 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractYarnTask extends 
AbstractRemoteTask {
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             // SHELL task exit code
-            TaskResponse response = shellCommandExecutor.run(buildCommand());
+            TaskResponse response = shellCommandExecutor.run(buildCommand(), 
taskCallBack);
             setExitStatusCode(response.getExitStatusCode());
             // set appIds
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
index 71643d56cd..b1a85852dc 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
@@ -22,4 +22,6 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 public interface TaskCallBack {
 
     public void updateRemoteApplicationInfo(int taskInstanceId, 
ApplicationInfo applicationInfo);
+
+    public void updateTaskInstanceInfo(int taskInstanceId);
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index 6a832488c4..7f09243104 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -115,7 +115,7 @@ public class ChunJunTask extends AbstractTask {
 
             String jsonFilePath = buildChunJunJsonFile(paramsMap);
             String shellCommandFilePath = buildShellCommandFile(jsonFilePath, 
paramsMap);
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellCommandFilePath);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
 
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 53bd2a1871..dfa45ff5d2 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -159,7 +159,7 @@ public class DataxTask extends AbstractTask {
             // run datax processDataSourceService
             String jsonFilePath = buildDataxJsonFile(paramsMap);
             String shellCommandFilePath = buildShellCommandFile(jsonFilePath, 
paramsMap);
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellCommandFilePath);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
 
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
index 08c8d5466e..7368b6208f 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
@@ -67,7 +68,17 @@ public class DataxTaskTest {
 
     private DataxTask dataxTask;
 
-    private final TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+    private final TaskCallBack taskCallBack = new TaskCallBack() {
+
+        @Override
+        public void updateRemoteApplicationInfo(int taskInstanceId, 
ApplicationInfo applicationInfo) {
+
+        }
+
+        @Override
+        public void updateTaskInstanceInfo(int taskInstanceId) {
+
+        }
     };
 
     @BeforeEach
@@ -99,7 +110,7 @@ public class DataxTaskTest {
         taskResponse.setStatus(TaskRunStatus.SUCCESS);
         taskResponse.setExitStatusCode(0);
         taskResponse.setProcessId(1);
-        when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
+        when(shellCommandExecutor.run(anyString(), 
eq(taskCallBack))).thenReturn(taskResponse);
 
         dataxTask.handle(taskCallBack);
         Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -140,7 +151,7 @@ public class DataxTaskTest {
         taskResponse.setStatus(TaskRunStatus.SUCCESS);
         taskResponse.setExitStatusCode(0);
         taskResponse.setProcessId(1);
-        when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
+        when(shellCommandExecutor.run(anyString(), 
eq(taskCallBack))).thenReturn(taskResponse);
 
         dataxTask.handle(taskCallBack);
         Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -176,7 +187,8 @@ public class DataxTaskTest {
         shellCommandExecutorFiled.setAccessible(true);
         shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
 
-        when(shellCommandExecutor.run(anyString())).thenThrow(new 
InterruptedException("Command execution failed"));
+        when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+                .thenThrow(new InterruptedException("Command execution 
failed"));
         Assertions.assertThrows(TaskException.class, () -> 
dataxTask.handle(taskCallBack));
     }
 
@@ -194,7 +206,8 @@ public class DataxTaskTest {
         shellCommandExecutorFiled.setAccessible(true);
         shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
 
-        when(shellCommandExecutor.run(anyString())).thenThrow(new 
IOException("Command execution failed"));
+        when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+                .thenThrow(new IOException("Command execution failed"));
         Assertions.assertThrows(TaskException.class, () -> 
dataxTask.handle(taskCallBack));
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 32d1b48f75..37d2d2f8f9 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -79,7 +79,7 @@ public class DvcTask extends AbstractTask {
         try {
             // construct process
             String command = buildCommand();
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
             parameters.dealOutParam(shellCommandExecutor.getVarPool());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
index 2dabd12622..b1fab4d436 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 
 import org.apache.commons.io.IOUtils;
 
@@ -74,8 +75,17 @@ public class EmrAddStepsTaskTest {
     private EmrAddStepsTask emrAddStepsTask;
     private AmazonElasticMapReduce emrClient;
     private Step step;
-    private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+    private TaskCallBack taskCallBack = new TaskCallBack() {
 
+        @Override
+        public void updateRemoteApplicationInfo(int taskInstanceId, 
ApplicationInfo applicationInfo) {
+
+        }
+
+        @Override
+        public void updateTaskInstanceInfo(int taskInstanceId) {
+
+        }
     };
 
     @BeforeEach
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
index 1d5c90ba16..fc50f7ed97 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 
 import org.apache.commons.io.IOUtils;
 
@@ -96,8 +97,17 @@ public class EmrJobFlowTaskTest {
     private EmrJobFlowTask emrJobFlowTask;
     private AmazonElasticMapReduce emrClient;
     private Cluster cluster;
-    private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+    private TaskCallBack taskCallBack = new TaskCallBack() {
 
+        @Override
+        public void updateRemoteApplicationInfo(int taskInstanceId, 
ApplicationInfo applicationInfo) {
+
+        }
+
+        @Override
+        public void updateTaskInstanceInfo(int taskInstanceId) {
+
+        }
     };
 
     @BeforeEach
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index efd12b682a..f76e046c17 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -76,7 +76,7 @@ public class HiveCliTask extends AbstractRemoteTask {
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
-            final TaskResponse taskResponse = 
shellCommandExecutor.run(buildCommand());
+            final TaskResponse taskResponse = 
shellCommandExecutor.run(buildCommand(), taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setAppIds(taskResponse.getAppIds());
             setProcessId(taskResponse.getProcessId());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index 19083de50f..8628c01ea6 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -142,7 +142,7 @@ public class JavaTask extends AbstractTask {
                     throw new RunTypeNotFoundException("run type is required, 
but it is null now.");
             }
             Preconditions.checkNotNull(command, "command not be null.");
-            TaskResponse taskResponse = shellCommandExecutor.run(command);
+            TaskResponse taskResponse = shellCommandExecutor.run(command, 
taskCallBack);
             log.info("java task run result: {}", taskResponse);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setAppIds(taskResponse.getAppIds());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
index 7fdc6b2917..c9193c397a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.dolphinscheduler.plugin.task.java.JavaConstants.RUN_TYP
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import 
org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException;
@@ -44,8 +45,17 @@ import org.junit.jupiter.api.Test;
 
 public class JavaTaskTest {
 
-    private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+    private TaskCallBack taskCallBack = new TaskCallBack() {
 
+        @Override
+        public void updateRemoteApplicationInfo(int taskInstanceId, 
ApplicationInfo applicationInfo) {
+
+        }
+
+        @Override
+        public void updateTaskInstanceInfo(int taskInstanceId) {
+
+        }
     };
 
     @Test
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 61fc07094b..f8d13b77b3 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -83,7 +83,7 @@ public class JupyterTask extends AbstractRemoteTask {
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
-            TaskResponse response = shellCommandExecutor.run(buildCommand());
+            TaskResponse response = shellCommandExecutor.run(buildCommand(), 
taskCallBack);
             setExitStatusCode(response.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(response.getProcessId());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index d6a1b8243d..acb54c9ebd 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -102,7 +102,7 @@ public class LinkisTask extends AbstractRemoteTask {
         try {
             // construct process
             String command = buildCommand();
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command, null);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(findTaskId(commandExecuteResult.getResultString()));
             setProcessId(commandExecuteResult.getProcessId());
@@ -128,7 +128,7 @@ public class LinkisTask extends AbstractRemoteTask {
             args.add(Constants.STATUS_OPTIONS);
             args.add(taskId);
             String command = String.join(Constants.SPACE, args);
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command, null);
             String status = findStatus(commandExecuteResult.getResultString());
             LinkisJobStatus jobStatus = 
LinkisJobStatus.convertFromJobStatusString(status);
             switch (jobStatus) {
@@ -161,7 +161,7 @@ public class LinkisTask extends AbstractRemoteTask {
             args.add(Constants.KILL_OPTIONS);
             args.add(taskId);
             String command = String.join(Constants.SPACE, args);
-            shellCommandExecutor.run(command);
+            shellCommandExecutor.run(command, null);
             setExitStatusCode(EXIT_CODE_KILL);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 7b252c8fd5..cd2487920a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -116,7 +116,7 @@ public class MlflowTask extends AbstractTask {
         try {
             // construct process
             String command = buildCommand();
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command, taskCallBack);
             int exitCode;
             if (mlflowParameters.getIsDeployDocker()) {
                 exitCode = checkDockerHealth();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index c3b33d064d..cb2cdfdcdc 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -110,7 +110,7 @@ public class PythonTask extends AbstractTask {
             createPythonCommandFileIfNotExists(pythonScriptContent, 
pythonScriptFile);
             String command = buildPythonExecuteCommand(pythonScriptFile);
 
-            TaskResponse taskResponse = shellCommandExecutor.run(command);
+            TaskResponse taskResponse = shellCommandExecutor.run(command, 
taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setProcessId(taskResponse.getProcessId());
             setVarPool(shellCommandExecutor.getVarPool());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index d48ae11473..f4f6d43a68 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -69,7 +69,7 @@ public class PytorchTask extends AbstractTask {
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             String command = buildPythonExecuteCommand();
-            TaskResponse taskResponse = shellCommandExecutor.run(command);
+            TaskResponse taskResponse = shellCommandExecutor.run(command, 
taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setProcessId(taskResponse.getProcessId());
             setVarPool(shellCommandExecutor.getVarPool());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 5b9e1ca2fc..975f8d8106 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -99,7 +99,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
         try {
             // construct process
             String command = buildCommand();
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index b5dd987547..5c1ea5991d 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -90,7 +90,7 @@ public class ShellTask extends AbstractTask {
         try {
             // construct process
             String command = buildCommand();
-            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command);
+            TaskResponse commandExecuteResult = 
shellCommandExecutor.run(command, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
             shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 8d9e585dc3..8c5cb11377 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 
 import org.apache.zeppelin.client.NoteResult;
 import org.apache.zeppelin.client.ParagraphResult;
@@ -64,8 +65,17 @@ public class ZeppelinTaskTest {
     private ZeppelinTask zeppelinTask;
     private ParagraphResult paragraphResult;
     private NoteResult noteResult;
-    private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+    private TaskCallBack taskCallBack = new TaskCallBack() {
 
+        @Override
+        public void updateRemoteApplicationInfo(int taskInstanceId, 
ApplicationInfo applicationInfo) {
+
+        }
+
+        @Override
+        public void updateTaskInstanceInfo(int taskInstanceId) {
+
+        }
     };
 
     @BeforeEach
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java
new file mode 100644
index 0000000000..e50c00df7a
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import lombok.NonNull;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskUpdatePidMessageSender implements 
MessageSender<TaskUpdatePidCommand> {
+
+    @Autowired
+    private WorkerRpcClient workerRpcClient;
+
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Override
+    public void sendMessage(TaskUpdatePidCommand message) throws 
RemotingException {
+        workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), 
message.convert2Command());
+    }
+
+    @Override
+    public TaskUpdatePidCommand buildMessage(@NonNull TaskExecutionContext 
taskExecutionContext,
+                                             @NonNull String 
messageReceiverAddress) {
+        TaskUpdatePidCommand taskUpdatePidCommand =
+                new TaskUpdatePidCommand(workerConfig.getWorkerAddress(),
+                        messageReceiverAddress,
+                        System.currentTimeMillis());
+        
taskUpdatePidCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        
taskUpdatePidCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+        taskUpdatePidCommand.setHost(taskExecutionContext.getHost());
+        taskUpdatePidCommand.setStartTime(taskExecutionContext.getStartTime());
+        return taskUpdatePidCommand;
+    }
+
+    @Override
+    public CommandType getMessageType() {
+        return CommandType.TASK_UPDATE_PID;
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java
new file mode 100644
index 0000000000..af3680d0eb
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+
+import javax.annotation.Resource;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
+/**
+ * task execute running ack processor
+ */
+@Component
+@Slf4j
+public class TaskUpdatePidAckProcessor implements NettyRequestProcessor {
+
+    @Resource
+    private MessageRetryRunner messageRetryRunner;
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_UPDATE_PID_ACK == 
command.getType(),
+                String.format("invalid command type : %s", command.getType()));
+
+        TaskUpdatePidAckMessage updatePidAckCommand = 
JSONUtils.parseObject(command.getBody(),
+                TaskUpdatePidAckMessage.class);
+        if (updatePidAckCommand == null) {
+            log.error("task execute update pid ack command is null");
+            return;
+        }
+        try {
+            
LogUtils.setTaskInstanceIdMDC(updatePidAckCommand.getTaskInstanceId());
+            log.info("task execute update pid ack command : {}", 
updatePidAckCommand);
+
+            if (updatePidAckCommand.isSuccess()) {
+                
messageRetryRunner.removeRetryMessage(updatePidAckCommand.getTaskInstanceId(),
+                        CommandType.TASK_UPDATE_PID);
+            }
+        } finally {
+            LogUtils.removeTaskInstanceIdMDC();
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
index 4380f85485..a6c7063622 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
@@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
+import 
org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor;
+
+import javax.annotation.Resource;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -45,6 +48,9 @@ public class WorkerRpcClient implements AutoCloseable {
     @Autowired
     private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
 
+    @Resource
+    private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor;
+
     @Autowired
     private TaskRejectAckProcessor taskRejectAckProcessor;
 
@@ -57,6 +63,8 @@ public class WorkerRpcClient implements AutoCloseable {
         // we only use the client to handle the ack message, we can optimize 
this, send ack to the nettyServer.
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
                 taskExecuteRunningAckProcessor);
+        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_UPDATE_PID_ACK,
+                taskUpdatePidAckProcessor);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, 
taskExecuteResultAckProcessor);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK, 
taskRejectAckProcessor);
         log.info("Worker rpc client started");
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 929e253553..6af593460b 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -29,9 +29,12 @@ import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAck
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor;
+import 
org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor;
 
 import java.io.Closeable;
 
+import javax.annotation.Resource;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -56,6 +59,9 @@ public class WorkerRpcServer implements Closeable {
     @Autowired
     private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
 
+    @Resource
+    private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor;
+
     @Autowired
     private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
 
@@ -79,6 +85,8 @@ public class WorkerRpcServer implements Closeable {
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, 
taskKillProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
                 taskExecuteRunningAckProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID_ACK,
+                taskUpdatePidAckProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, 
taskExecuteResultAckProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, 
taskRejectAckProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
 hostUpdateProcessor);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
index 9ee176b3f9..fac29f79e0 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
@@ -59,4 +59,18 @@ public class TaskCallbackImpl implements TaskCallBack {
         taskExecutionContext.setAppIds(applicationInfo.getAppIds());
         workerMessageSender.sendMessageWithRetry(taskExecutionContext, 
masterAddress, CommandType.TASK_EXECUTE_RUNNING);
     }
+
+    @Override
+    public void updateTaskInstanceInfo(int taskInstanceId) {
+        TaskExecutionContext taskExecutionContext =
+                
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+        if (taskExecutionContext == null) {
+            log.error("task execution context is empty, taskInstanceId: {}", 
taskInstanceId);
+            return;
+        }
+
+        log.info("send remote taskExecutionContext info {}", 
taskExecutionContext);
+        workerMessageSender.sendMessageWithRetry(taskExecutionContext, 
masterAddress, CommandType.TASK_UPDATE_PID);
+    }
+
 }

Reply via email to