This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.9-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit f2789054fa74ce1f6984f4790a2435f2058455ea
Author: fuchanghai <[email protected]>
AuthorDate: Mon Feb 20 14:10:59 2023 +0800

    cherry-pick after a submit failure, stop the processInstance to avoid an 
endless loop #13051
---
 .../common/enums/StateEventType.java               |  3 ++-
 .../master/event/WorkflowStartEventHandler.java    | 29 ++++++++++++++--------
 .../master/event/WorkflowStateEventHandler.java    |  3 +++
 3 files changed, 23 insertions(+), 12 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
index 5afadaaf06..6e25d2ed4b 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
@@ -27,7 +27,8 @@ public enum StateEventType {
     TASK_TIMEOUT(3, "task timeout"),
     WAKE_UP_TASK_GROUP(4, "wait task group"),
     TASK_RETRY(5, "task retry"),
-    PROCESS_BLOCKED(6, "process blocked");
+    PROCESS_BLOCKED(6, "process blocked"),
+    PROCESS_SUBMIT_FAILED(7, "process submit failed");
 
     StateEventType(int code, String descp) {
         this.code = code;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
index 8c5e27c117..0b43ae33fd 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.event;
 
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@@ -61,18 +63,23 @@ public class WorkflowStartEventHandler implements 
WorkflowEventHandler {
         ProcessInstanceMetrics.incProcessInstanceByState("submit");
         ProcessInstance processInstance = 
workflowExecuteRunnable.getProcessInstance();
         CompletableFuture.supplyAsync(workflowExecuteRunnable::call, 
workflowExecuteThreadPool)
-            .thenAccept(workflowSubmitStatue -> {
-                if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
-                    // submit failed will resend the event to workflow event 
queue
-                    logger.info("Success submit the workflow instance");
-                    if (processInstance.getTimeout() > 0) {
-                        
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+                .thenAccept(workflowSubmitStatue -> {
+                    if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
+                        logger.info("Success submit the workflow instance");
+                        if (processInstance.getTimeout() > 0) {
+                            
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+                        }
+                    } else if (WorkflowSubmitStatue.FAILED == 
workflowSubmitStatue) {
+                        logger.error(
+                                "Failed to submit the workflow instance, will 
resend the workflow start event: {}",
+                                workflowEvent);
+                        WorkflowStateEvent stateEvent = 
WorkflowStateEvent.builder()
+                                .processInstanceId(processInstance.getId())
+                                .type(StateEventType.PROCESS_SUBMIT_FAILED)
+                                .status(WorkflowExecutionStatus.FAILURE)
+                                .build();
+                        workflowExecuteRunnable.addStateEvent(stateEvent);
                     }
-                } else {
-                    logger.error("Failed to submit the workflow instance, will 
resend the workflow start event: {}",
-                                 workflowEvent);
-                    workflowEventQueue.addEvent(workflowEvent);
-                }
             });
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
index c3f49111a7..685e1bfb6e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
@@ -57,6 +57,9 @@ public class WorkflowStateEventHandler implements 
StateEventHandler {
             return true;
         }
         if (workflowStateEvent.getStatus().isFinished()) {
+            if 
(workflowStateEvent.getType().equals(StateEventType.PROCESS_SUBMIT_FAILED)) {
+                
workflowExecuteRunnable.updateProcessInstanceState(workflowStateEvent);
+            }
             workflowExecuteRunnable.endProcess();
         }
         if (processInstance.getState().isReadyStop()) {

Reply via email to