This is an automated email from the ASF dual-hosted git repository. kerwin pushed a commit to branch 3.1.9-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit f2789054fa74ce1f6984f4790a2435f2058455ea Author: fuchanghai <[email protected]> AuthorDate: Mon Feb 20 14:10:59 2023 +0800 cherry-pick after a submit failure, stop the processInstance to avoid an endless loop #13051 --- .../common/enums/StateEventType.java | 3 ++- .../master/event/WorkflowStartEventHandler.java | 29 ++++++++++++++-------- .../master/event/WorkflowStateEventHandler.java | 3 +++ 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java index 5afadaaf06..6e25d2ed4b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java @@ -27,7 +27,8 @@ public enum StateEventType { TASK_TIMEOUT(3, "task timeout"), WAKE_UP_TASK_GROUP(4, "wait task group"), TASK_RETRY(5, "task retry"), - PROCESS_BLOCKED(6, "process blocked"); + PROCESS_BLOCKED(6, "process blocked"), + PROCESS_SUBMIT_FAILED(7, "process submit failed"); StateEventType(int code, String descp) { this.code = code; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java index 8c5e27c117..0b43ae33fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.event; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; @@ -61,18 +63,23 @@ public class WorkflowStartEventHandler implements WorkflowEventHandler { ProcessInstanceMetrics.incProcessInstanceByState("submit"); ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool) - .thenAccept(workflowSubmitStatue -> { - if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { - // submit failed will resend the event to workflow event queue - logger.info("Success submit the workflow instance"); - if (processInstance.getTimeout() > 0) { - stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); + .thenAccept(workflowSubmitStatue -> { + if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { + logger.info("Success submit the workflow instance"); + if (processInstance.getTimeout() > 0) { + stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); + } + } else if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) { + logger.error( + "Failed to submit the workflow instance, will resend the workflow start event: {}", + workflowEvent); + WorkflowStateEvent stateEvent = WorkflowStateEvent.builder() + .processInstanceId(processInstance.getId()) + .type(StateEventType.PROCESS_SUBMIT_FAILED) + .status(WorkflowExecutionStatus.FAILURE) + .build(); + workflowExecuteRunnable.addStateEvent(stateEvent); } - } else { - logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}", - workflowEvent); - workflowEventQueue.addEvent(workflowEvent); - } }); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java index c3f49111a7..685e1bfb6e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java @@ -57,6 +57,9 @@ public class WorkflowStateEventHandler implements StateEventHandler { return true; } if (workflowStateEvent.getStatus().isFinished()) { + if (workflowStateEvent.getType().equals(StateEventType.PROCESS_SUBMIT_FAILED)) { + workflowExecuteRunnable.updateProcessInstanceState(workflowStateEvent); + } workflowExecuteRunnable.endProcess(); } if (processInstance.getState().isReadyStop()) {
