This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 12dd60fa46 Fix task wake up failed will block the event (#13466)
12dd60fa46 is described below
commit 12dd60fa46046659021d1c93d4af2849f22accdf
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Feb 1 15:31:04 2023 +0800
Fix task wake up failed will block the event (#13466)
---
.../apache/dolphinscheduler/common/enums/StateEventType.java | 2 +-
.../server/master/event/TaskWaitTaskGroupStateHandler.java | 10 ++++++----
.../server/master/processor/TaskEventProcessor.java | 2 +-
.../server/master/runner/WorkflowExecuteRunnable.java | 2 +-
.../server/master/runner/WorkflowExecuteThreadPool.java | 2 +-
.../dolphinscheduler/service/process/ProcessServiceImpl.java | 1 +
6 files changed, 11 insertions(+), 8 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
index bc021e5e08..5afadaaf06 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
@@ -25,7 +25,7 @@ public enum StateEventType {
TASK_STATE_CHANGE(1, "task state change"),
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"),
- WAIT_TASK_GROUP(4, "wait task group"),
+ WAKE_UP_TASK_GROUP(4, "wait task group"),
TASK_RETRY(5, "task retry"),
PROCESS_BLOCKED(6, "process blocked");
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
index bb0c7b8b70..b5fd02258e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
@@ -32,16 +32,18 @@ public class TaskWaitTaskGroupStateHandler implements
StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable,
- StateEvent stateEvent) throws
StateEventHandleFailure {
+ StateEvent stateEvent) {
logger.info("Handle task instance wait task group event,
taskInstanceId: {}", stateEvent.getTaskInstanceId());
- if (!workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
- throw new StateEventHandleFailure("Task state event handle failed
due to robing taskGroup resource failed");
+ if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
+ logger.info("Success wake up task instance, taskInstanceId: {}",
stateEvent.getTaskInstanceId());
+ } else {
+ logger.info("Failed to wake up task instance, taskInstanceId: {}",
stateEvent.getTaskInstanceId());
}
return true;
}
@Override
public StateEventType getEventType() {
- return StateEventType.WAIT_TASK_GROUP;
+ return StateEventType.WAKE_UP_TASK_GROUP;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index 43822fce58..e5f0e0f099 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -58,7 +58,7 @@ public class TaskEventProcessor implements
NettyRequestProcessor {
.processInstanceId(taskEventChangeCommand.getProcessInstanceId())
.taskInstanceId(taskEventChangeCommand.getTaskInstanceId())
.key(taskEventChangeCommand.getKey())
- .type(StateEventType.WAIT_TASK_GROUP)
+ .type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 5f981cf92e..7c75a6517f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -482,7 +482,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
TaskStateEvent nextEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(nextTaskInstance.getId())
- .type(StateEventType.WAIT_TASK_GROUP)
+ .type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
this.stateEvents.add(nextEvent);
} else {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 509028277a..be4d55eeed 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -109,7 +109,7 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
return;
}
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
- logger.warn("The workflow has been executed by another thread");
+ logger.debug("The workflow has been executed by another thread");
return;
}
multiThreadFilterMap.put(workflowExecuteThread.getKey(),
workflowExecuteThread);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 3591bec514..c2dad29de1 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2637,6 +2637,7 @@ public class ProcessServiceImpl implements ProcessService
{
processInstance.getId(), taskId);
Host host = new Host(processInstance.getHost());
stateEventCallbackService.sendResult(host,
taskEventChangeCommand.convert2Command(taskType));
+ logger.info("Success send command to master: {}, command: {}", host,
taskEventChangeCommand);
}
@Override