This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7bf3e3cdd6 [improve-#13201] update pid during running (#13206)
7bf3e3cdd6 is described below
commit 7bf3e3cdd66a68ac8d155b08fbd70365aa3a306b
Author: fuchanghai <[email protected]>
AuthorDate: Wed Feb 22 14:22:49 2023 +0800
[improve-#13201] update pid during running (#13206)
---
.../common/enums/TaskEventType.java | 1 +
.../master/event/TaskUpdatePidEventHandler.java | 103 +++++++++++++++++++++
.../master/processor/TaskUpdatePidProcessor.java | 66 +++++++++++++
.../server/master/processor/queue/TaskEvent.java | 13 +++
.../server/master/rpc/MasterRPCServer.java | 5 +
.../remote/command/CommandType.java | 10 ++
.../remote/command/TaskUpdatePidAckMessage.java | 55 +++++++++++
.../remote/command/TaskUpdatePidCommand.java | 83 +++++++++++++++++
.../plugin/task/api/AbstractCommandExecutor.java | 7 +-
.../plugin/task/api/AbstractYarnTask.java | 2 +-
.../plugin/task/api/TaskCallBack.java | 2 +
.../plugin/task/chunjun/ChunJunTask.java | 2 +-
.../plugin/task/datax/DataxTask.java | 2 +-
.../plugin/task/datax/DataxTaskTest.java | 23 ++++-
.../dolphinscheduler/plugin/task/dvc/DvcTask.java | 2 +-
.../plugin/task/emr/EmrAddStepsTaskTest.java | 12 ++-
.../plugin/task/emr/EmrJobFlowTaskTest.java | 12 ++-
.../plugin/task/hivecli/HiveCliTask.java | 2 +-
.../plugin/task/java/JavaTask.java | 2 +-
.../plugin/task/java/JavaTaskTest.java | 12 ++-
.../plugin/task/jupyter/JupyterTask.java | 2 +-
.../plugin/task/linkis/LinkisTask.java | 6 +-
.../plugin/task/mlflow/MlflowTask.java | 2 +-
.../plugin/task/python/PythonTask.java | 2 +-
.../plugin/task/pytorch/PytorchTask.java | 2 +-
.../plugin/task/seatunnel/SeatunnelTask.java | 2 +-
.../plugin/task/shell/ShellTask.java | 2 +-
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 12 ++-
.../worker/message/TaskUpdatePidMessageSender.java | 65 +++++++++++++
.../processor/TaskUpdatePidAckProcessor.java | 71 ++++++++++++++
.../server/worker/rpc/WorkerRpcClient.java | 8 ++
.../server/worker/rpc/WorkerRpcServer.java | 8 ++
.../server/worker/runner/TaskCallbackImpl.java | 14 +++
33 files changed, 587 insertions(+), 25 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
index f24f168679..371694a051 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
@@ -22,6 +22,7 @@ public enum TaskEventType {
DELAY,
RUNNING,
RESULT,
+ UPDATE_PID,
WORKER_REJECT,
CACHE,
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
new file mode 100644
index 0000000000..1f81139ae2
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage;
+import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskUpdatePidEventHandler implements TaskEventHandler {
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ @Autowired
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ @Autowired
+ private TaskInstanceDao taskInstanceDao;
+
+ @Override
+ public void handleTaskEvent(TaskEvent taskEvent) throws
TaskEventHandleError {
+ int taskInstanceId = taskEvent.getTaskInstanceId();
+ int processInstanceId = taskEvent.getProcessInstanceId();
+
+ WorkflowExecuteRunnable workflowExecuteRunnable =
+
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ if (workflowExecuteRunnable == null) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task running event error, cannot find related
workflow instance from cache, will discard this event");
+ }
+ Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+ if (!taskInstanceOptional.isPresent()) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle running event error, cannot find the taskInstance
from cache, will discord this event");
+ }
+ TaskInstance taskInstance = taskInstanceOptional.get();
+ if (taskInstance.getState().isFinished()) {
+ sendAckToWorker(taskEvent);
+ throw new TaskEventHandleError(
+ "Handle task running event error, this task instance is
already finished, this event is delay, will discard this event");
+ }
+
+ TaskInstance oldTaskInstance = new TaskInstance();
+ TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
+ try {
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setPid(taskEvent.getProcessId());
+ if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
+ throw new TaskEventHandleError("Handle task running event
error, update taskInstance to db failed");
+ }
+ sendAckToWorker(taskEvent);
+ } catch (Exception ex) {
+ TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
+ if (ex instanceof TaskEventHandleError) {
+ throw ex;
+ }
+ throw new TaskEventHandleError("Handle task update pid event
error, update taskInstance to db failed", ex);
+ }
+
+ }
+
+ private void sendAckToWorker(TaskEvent taskEvent) {
+ // If event handle success, send ack to worker to otherwise the worker
will retry this event
+ TaskUpdatePidAckMessage taskUpdatePidAckMessage =
+ new TaskUpdatePidAckMessage(true,
taskEvent.getTaskInstanceId());
+
taskEvent.getChannel().writeAndFlush(taskUpdatePidAckMessage.convert2Command());
+ }
+
+ @Override
+ public TaskEventType getHandleEventType() {
+ return TaskEventType.UPDATE_PID;
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java
new file mode 100644
index 0000000000..bca59a079f
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
+/**
+ * task execute running processor
+ */
+@Component
+@Slf4j
+public class TaskUpdatePidProcessor implements NettyRequestProcessor {
+
+ @Autowired
+ private TaskEventService taskEventService;
+
+ /**
+ * task ack process
+ *
+ * @param channel channel channel
+ * @param command command TaskExecuteAckCommand
+ */
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.TASK_UPDATE_PID ==
command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+ TaskUpdatePidCommand taskUpdatePidCommand =
+ JSONUtils.parseObject(command.getBody(),
TaskUpdatePidCommand.class);
+ log.info("taskUpdatePidCommand: {}", taskUpdatePidCommand);
+
+ TaskEvent taskEvent = TaskEvent.newUpdatePidEvent(taskUpdatePidCommand,
+ channel,
+ taskUpdatePidCommand.getMessageSenderAddress());
+ taskEventService.addEvent(taskEvent);
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 2573707ec0..1d2827223c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -23,6 +23,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
import java.util.Date;
@@ -161,4 +162,16 @@ public class TaskEvent {
event.setEvent(TaskEventType.CACHE);
return event;
}
+
+ public static TaskEvent newUpdatePidEvent(TaskUpdatePidCommand command,
Channel channel, String workerAddress) {
+ TaskEvent event = new TaskEvent();
+ event.setProcessInstanceId(command.getProcessInstanceId());
+ event.setTaskInstanceId(command.getTaskInstanceId());
+ event.setStartTime(DateUtils.timeStampToDate(command.getStartTime()));
+ event.setLogPath(command.getLogPath());
+ event.setChannel(channel);
+ event.setWorkerAddress(workerAddress);
+ event.setEvent(TaskEventType.UPDATE_PID);
+ return event;
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 1a070c10af..a72d73b849 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningPro
import
org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
+import
org.apache.dolphinscheduler.server.master.processor.TaskUpdatePidProcessor;
import
org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
import lombok.extern.slf4j.Slf4j;
@@ -67,6 +68,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
+ @Autowired
+ private TaskUpdatePidProcessor updatePidProcessor;
+
@Autowired
private TaskRecallProcessor taskRecallProcessor;
@@ -86,6 +90,7 @@ public class MasterRPCServer implements AutoCloseable {
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING,
taskExecuteRunningProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID,
updatePidProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT,
taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE,
taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST,
stateEventProcessor);
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 920332f4a5..0ee631ae28 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -131,6 +131,16 @@ public enum CommandType {
*/
WORKFLOW_EXECUTING_DATA_REQUEST,
+ /**
+ * update taskInstance's PID request
+ */
+ TASK_UPDATE_PID,
+
+ /**
+ * update taskInstance's PID response ack, from master to worker
+ */
+ TASK_UPDATE_PID_ACK,
+
/**
* workflow executing data response, from master to api
*/
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java
new file mode 100644
index 0000000000..4f878c5c55
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * task execute running ack command
+ * from master to worker
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class TaskUpdatePidAckMessage implements Serializable {
+
+ private boolean success;
+ private int taskInstanceId;
+
+ /**
+ * package response command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.TASK_UPDATE_PID_ACK);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java
new file mode 100644
index 0000000000..91e5787bd5
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * Task running message, means the task is running in worker.
+ */
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskUpdatePidCommand extends BaseCommand {
+
+ /**
+ * taskInstanceId
+ */
+ private int taskInstanceId;
+
+ /**
+ * process instance id
+ */
+ private int processInstanceId;
+
+ /**
+ * startTime
+ */
+ private long startTime;
+
+ /**
+ * host
+ */
+ private String host;
+
+ /**
+ * logPath
+ */
+ private String logPath;
+
+ /**
+ * processId
+ */
+ private int processId;
+
+ public TaskUpdatePidCommand(String messageSenderAddress, String
messageReceiverAddress, long messageSendTime) {
+ super(messageSenderAddress, messageReceiverAddress, messageSendTime);
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.TASK_UPDATE_PID);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index e2e0ce9338..c3e9b952ad 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -186,7 +186,7 @@ public abstract class AbstractCommandExecutor {
command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
}
- public TaskResponse run(String execCommand) throws IOException,
InterruptedException {
+ public TaskResponse run(String execCommand, TaskCallBack taskCallBack)
throws IOException, InterruptedException {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null ==
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
@@ -228,6 +228,11 @@ public abstract class AbstractCommandExecutor {
// if timeout occurs, exit directly
long remainTime = getRemainTime();
+ // update pid before waiting for the run to finish
+ if (null != taskCallBack) {
+ taskCallBack.updateTaskInstanceInfo(taskInstanceId);
+ }
+
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 68993dc9dc..85ab9a610f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractYarnTask extends
AbstractRemoteTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// SHELL task exit code
- TaskResponse response = shellCommandExecutor.run(buildCommand());
+ TaskResponse response = shellCommandExecutor.run(buildCommand(),
taskCallBack);
setExitStatusCode(response.getExitStatusCode());
// set appIds
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
index 71643d56cd..b1a85852dc 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
@@ -22,4 +22,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
public interface TaskCallBack {
public void updateRemoteApplicationInfo(int taskInstanceId,
ApplicationInfo applicationInfo);
+
+ public void updateTaskInstanceInfo(int taskInstanceId);
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index 6a832488c4..7f09243104 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -115,7 +115,7 @@ public class ChunJunTask extends AbstractTask {
String jsonFilePath = buildChunJunJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath,
paramsMap);
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellCommandFilePath);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 53bd2a1871..dfa45ff5d2 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -159,7 +159,7 @@ public class DataxTask extends AbstractTask {
// run datax processDataSourceService
String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath,
paramsMap);
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellCommandFilePath);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
index 08c8d5466e..7368b6208f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
@@ -67,7 +68,17 @@ public class DataxTaskTest {
private DataxTask dataxTask;
- private final TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+ private final TaskCallBack taskCallBack = new TaskCallBack() {
+
+ @Override
+ public void updateRemoteApplicationInfo(int taskInstanceId,
ApplicationInfo applicationInfo) {
+
+ }
+
+ @Override
+ public void updateTaskInstanceInfo(int taskInstanceId) {
+
+ }
};
@BeforeEach
@@ -99,7 +110,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
- when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
+ when(shellCommandExecutor.run(anyString(),
eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -140,7 +151,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
- when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
+ when(shellCommandExecutor.run(anyString(),
eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -176,7 +187,8 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
- when(shellCommandExecutor.run(anyString())).thenThrow(new
InterruptedException("Command execution failed"));
+ when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+ .thenThrow(new InterruptedException("Command execution
failed"));
Assertions.assertThrows(TaskException.class, () ->
dataxTask.handle(taskCallBack));
}
@@ -194,7 +206,8 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
- when(shellCommandExecutor.run(anyString())).thenThrow(new
IOException("Command execution failed"));
+ when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+ .thenThrow(new IOException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () ->
dataxTask.handle(taskCallBack));
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 32d1b48f75..37d2d2f8f9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -79,7 +79,7 @@ public class DvcTask extends AbstractTask {
try {
// construct process
String command = buildCommand();
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(command);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
index 2dabd12622..b1fab4d436 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.commons.io.IOUtils;
@@ -74,8 +75,17 @@ public class EmrAddStepsTaskTest {
private EmrAddStepsTask emrAddStepsTask;
private AmazonElasticMapReduce emrClient;
private Step step;
- private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+ private TaskCallBack taskCallBack = new TaskCallBack() {
+ @Override
+ public void updateRemoteApplicationInfo(int taskInstanceId,
ApplicationInfo applicationInfo) {
+
+ }
+
+ @Override
+ public void updateTaskInstanceInfo(int taskInstanceId) {
+
+ }
};
@BeforeEach
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
index 1d5c90ba16..fc50f7ed97 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.commons.io.IOUtils;
@@ -96,8 +97,17 @@ public class EmrJobFlowTaskTest {
private EmrJobFlowTask emrJobFlowTask;
private AmazonElasticMapReduce emrClient;
private Cluster cluster;
- private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+ private TaskCallBack taskCallBack = new TaskCallBack() {
+ @Override
+ public void updateRemoteApplicationInfo(int taskInstanceId,
ApplicationInfo applicationInfo) {
+
+ }
+
+ @Override
+ public void updateTaskInstanceInfo(int taskInstanceId) {
+
+ }
};
@BeforeEach
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index efd12b682a..f76e046c17 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -76,7 +76,7 @@ public class HiveCliTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
- final TaskResponse taskResponse =
shellCommandExecutor.run(buildCommand());
+ final TaskResponse taskResponse =
shellCommandExecutor.run(buildCommand(), taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index 19083de50f..8628c01ea6 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -142,7 +142,7 @@ public class JavaTask extends AbstractTask {
throw new RunTypeNotFoundException("run type is required,
but it is null now.");
}
Preconditions.checkNotNull(command, "command not be null.");
- TaskResponse taskResponse = shellCommandExecutor.run(command);
+ TaskResponse taskResponse = shellCommandExecutor.run(command,
taskCallBack);
log.info("java task run result: {}", taskResponse);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
index 7fdc6b2917..c9193c397a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
@@ -25,6 +25,7 @@ import static
org.apache.dolphinscheduler.plugin.task.java.JavaConstants.RUN_TYP
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException;
@@ -44,8 +45,17 @@ import org.junit.jupiter.api.Test;
public class JavaTaskTest {
- private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+ private TaskCallBack taskCallBack = new TaskCallBack() {
+ @Override
+ public void updateRemoteApplicationInfo(int taskInstanceId,
ApplicationInfo applicationInfo) {
+
+ }
+
+ @Override
+ public void updateTaskInstanceInfo(int taskInstanceId) {
+
+ }
};
@Test
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 61fc07094b..f8d13b77b3 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -83,7 +83,7 @@ public class JupyterTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
- TaskResponse response = shellCommandExecutor.run(buildCommand());
+ TaskResponse response = shellCommandExecutor.run(buildCommand(),
taskCallBack);
setExitStatusCode(response.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index d6a1b8243d..acb54c9ebd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -102,7 +102,7 @@ public class LinkisTask extends AbstractRemoteTask {
try {
// construct process
String command = buildCommand();
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(command);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(command, null);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId());
@@ -128,7 +128,7 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.STATUS_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(command);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(command, null);
String status = findStatus(commandExecuteResult.getResultString());
LinkisJobStatus jobStatus =
LinkisJobStatus.convertFromJobStatusString(status);
switch (jobStatus) {
@@ -161,7 +161,7 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.KILL_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
- shellCommandExecutor.run(command);
+ shellCommandExecutor.run(command, null);
setExitStatusCode(EXIT_CODE_KILL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 7b252c8fd5..cd2487920a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -116,7 +116,7 @@ public class MlflowTask extends AbstractTask {
try {
// construct process
String command = buildCommand();
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(command);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(command, taskCallBack);
int exitCode;
if (mlflowParameters.getIsDeployDocker()) {
exitCode = checkDockerHealth();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index c3b33d064d..cb2cdfdcdc 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -110,7 +110,7 @@ public class PythonTask extends AbstractTask {
createPythonCommandFileIfNotExists(pythonScriptContent,
pythonScriptFile);
String command = buildPythonExecuteCommand(pythonScriptFile);
- TaskResponse taskResponse = shellCommandExecutor.run(command);
+ TaskResponse taskResponse = shellCommandExecutor.run(command,
taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index d48ae11473..f4f6d43a68 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -69,7 +69,7 @@ public class PytorchTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
String command = buildPythonExecuteCommand();
- TaskResponse taskResponse = shellCommandExecutor.run(command);
+ TaskResponse taskResponse = shellCommandExecutor.run(command,
taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 5b9e1ca2fc..975f8d8106 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -99,7 +99,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
try {
// construct process
String command = buildCommand();
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(command);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index b5dd987547..5c1ea5991d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -90,7 +90,7 @@ public class ShellTask extends AbstractTask {
try {
// construct process
String command = buildCommand();
- TaskResponse commandExecuteResult =
shellCommandExecutor.run(command);
+ TaskResponse commandExecuteResult =
shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 8d9e585dc3..8c5cb11377 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
@@ -64,8 +65,17 @@ public class ZeppelinTaskTest {
private ZeppelinTask zeppelinTask;
private ParagraphResult paragraphResult;
private NoteResult noteResult;
- private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+ private TaskCallBack taskCallBack = new TaskCallBack() {
+ @Override
+ public void updateRemoteApplicationInfo(int taskInstanceId,
ApplicationInfo applicationInfo) {
+
+ }
+
+ @Override
+ public void updateTaskInstanceInfo(int taskInstanceId) {
+
+ }
};
@BeforeEach
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java
new file mode 100644
index 0000000000..e50c00df7a
--- /dev/null
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import lombok.NonNull;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskUpdatePidMessageSender implements
MessageSender<TaskUpdatePidCommand> {
+
+ @Autowired
+ private WorkerRpcClient workerRpcClient;
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Override
+ public void sendMessage(TaskUpdatePidCommand message) throws
RemotingException {
+ workerRpcClient.send(Host.of(message.getMessageReceiverAddress()),
message.convert2Command());
+ }
+
+ @Override
+ public TaskUpdatePidCommand buildMessage(@NonNull TaskExecutionContext
taskExecutionContext,
+ @NonNull String
messageReceiverAddress) {
+ TaskUpdatePidCommand taskUpdatePidCommand =
+ new TaskUpdatePidCommand(workerConfig.getWorkerAddress(),
+ messageReceiverAddress,
+ System.currentTimeMillis());
+
taskUpdatePidCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
taskUpdatePidCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+ taskUpdatePidCommand.setHost(taskExecutionContext.getHost());
+ taskUpdatePidCommand.setStartTime(taskExecutionContext.getStartTime());
+ return taskUpdatePidCommand;
+ }
+
+ @Override
+ public CommandType getMessageType() {
+ return CommandType.TASK_UPDATE_PID;
+ }
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java
new file mode 100644
index 0000000000..af3680d0eb
--- /dev/null
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+
+import javax.annotation.Resource;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
+/**
+ * task execute running ack processor
+ */
+@Component
+@Slf4j
+public class TaskUpdatePidAckProcessor implements NettyRequestProcessor {
+
+ @Resource
+ private MessageRetryRunner messageRetryRunner;
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.TASK_UPDATE_PID_ACK ==
command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ TaskUpdatePidAckMessage updatePidAckCommand =
JSONUtils.parseObject(command.getBody(),
+ TaskUpdatePidAckMessage.class);
+ if (updatePidAckCommand == null) {
+ log.error("task execute update pid ack command is null");
+ return;
+ }
+ try {
+
LogUtils.setTaskInstanceIdMDC(updatePidAckCommand.getTaskInstanceId());
+ log.info("task execute update pid ack command : {}",
updatePidAckCommand);
+
+ if (updatePidAckCommand.isSuccess()) {
+
messageRetryRunner.removeRetryMessage(updatePidAckCommand.getTaskInstanceId(),
+ CommandType.TASK_UPDATE_PID);
+ }
+ } finally {
+ LogUtils.removeTaskInstanceIdMDC();
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
index 4380f85485..a6c7063622 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
@@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
+import
org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor;
+
+import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -45,6 +48,9 @@ public class WorkerRpcClient implements AutoCloseable {
@Autowired
private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
+ @Resource
+ private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor;
+
@Autowired
private TaskRejectAckProcessor taskRejectAckProcessor;
@@ -57,6 +63,8 @@ public class WorkerRpcClient implements AutoCloseable {
// we only use the client to handle the ack message, we can optimize
this, send ack to the nettyServer.
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor);
+
this.nettyRemotingClient.registerProcessor(CommandType.TASK_UPDATE_PID_ACK,
+ taskUpdatePidAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK,
taskExecuteResultAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK,
taskRejectAckProcessor);
log.info("Worker rpc client started");
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 929e253553..6af593460b 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -29,9 +29,12 @@ import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAck
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor;
+import
org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor;
import java.io.Closeable;
+import javax.annotation.Resource;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -56,6 +59,9 @@ public class WorkerRpcServer implements Closeable {
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
+ @Resource
+ private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor;
+
@Autowired
private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
@@ -79,6 +85,8 @@ public class WorkerRpcServer implements Closeable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST,
taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor);
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID_ACK,
+ taskUpdatePidAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK,
taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK,
taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
hostUpdateProcessor);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
index 9ee176b3f9..fac29f79e0 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
@@ -59,4 +59,18 @@ public class TaskCallbackImpl implements TaskCallBack {
taskExecutionContext.setAppIds(applicationInfo.getAppIds());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress, CommandType.TASK_EXECUTE_RUNNING);
}
+
+ @Override
+ public void updateTaskInstanceInfo(int taskInstanceId) {
+ TaskExecutionContext taskExecutionContext =
+
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+ if (taskExecutionContext == null) {
+ log.error("task execution context is empty, taskInstanceId: {}",
taskInstanceId);
+ return;
+ }
+
+ log.info("send remote taskExecutionContext info {}",
taskExecutionContext);
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress, CommandType.TASK_UPDATE_PID);
+ }
+
}