This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 1.3.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.4-prepare by this push:
new 656ccab cherry-pick task_ack_miss (#4198)
656ccab is described below
commit 656ccab9302cca7ddc9a37b1c5b4adcec6a1d838
Author: Kirs <[email protected]>
AuthorDate: Thu Dec 10 16:48:11 2020 +0800
cherry-pick task_ack_miss (#4198)
---
.../master/processor/queue/TaskResponseService.java | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 4cd7e8b..d09ed71 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -134,16 +134,17 @@ public class TaskResponseService {
case ACK:
try {
TaskInstance taskInstance =
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
- if (taskInstance != null &&
!taskInstance.getState().typeIsFinished()) {
-
processService.changeTaskState(taskResponseEvent.getState(),
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath(),
- taskResponseEvent.getTaskInstanceId());
+ if (taskInstance != null) {
+ ExecutionStatus status =
taskInstance.getState().typeIsFinished() ? taskInstance.getState() :
taskResponseEvent.getState();
+ processService.changeTaskState(status,
+ taskResponseEvent.getStartTime(),
+ taskResponseEvent.getWorkerAddress(),
+ taskResponseEvent.getExecutePath(),
+ taskResponseEvent.getLogPath(),
+ taskResponseEvent.getTaskInstanceId());
}
// if taskInstance is null (maybe deleted) . retry will be
meaningless . so ack success
- DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+ DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
}catch (Exception e){
logger.error("worker ack master error",e);