This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7ae4fb3787 [Fix-14149][master] task finished so not dispatch (#14161)
7ae4fb3787 is described below

commit 7ae4fb378779077c75cd6411ef8e3b68f232bd84
Author: eye-gu <[email protected]>
AuthorDate: Thu Aug 31 20:08:09 2023 +0800

    [Fix-14149][master] task finished so not dispatch (#14161)
---
 .../master/runner/WorkflowExecuteRunnable.java     | 33 +++++++++++++++++++++-
 .../master/runner/WorkflowExecuteRunnableTest.java | 18 ++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 12fed3ab56..7bf2f30e8c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -82,6 +82,7 @@ import 
org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
 import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -122,6 +123,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.BeanUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -983,7 +985,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                     }
                 }
                 // 4. submit to dispatch queue
-                taskExecuteRunnable.dispatch();
+                tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
 
                 stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance, 
taskInstance);
                 return true;
@@ -996,6 +998,35 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         }
     }
 
+    /**
+     * Sometimes (such as pause), if the task instance status has already been 
finished,
+     * there is no need to dispatch it
+     */
+    @VisibleForTesting
+    void tryToDispatchTaskInstance(TaskInstance taskInstance, 
TaskExecuteRunnable taskExecuteRunnable) {
+        if (!taskInstance.getState().isFinished()) {
+            taskExecuteRunnable.dispatch();
+        } else {
+            if (workflowExecuteContext.getWorkflowInstance().isBlocked()) {
+                TaskStateEvent processBlockEvent = TaskStateEvent.builder()
+                        
.processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
+                        .taskInstanceId(taskInstance.getId())
+                        .status(taskInstance.getState())
+                        .type(StateEventType.PROCESS_BLOCKED)
+                        .build();
+                this.stateEvents.add(processBlockEvent);
+            }
+
+            TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder()
+                    
.processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
+                    .taskInstanceId(taskInstance.getId())
+                    .status(taskInstance.getState())
+                    .type(StateEventType.TASK_STATE_CHANGE)
+                    .build();
+            this.stateEvents.add(taskStateChangeEvent);
+        }
+    }
+
     /**
      * find task instance in db.
      * in case submit more than one same name task in the same time.
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index da96be2066..9599478f4a 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -35,9 +35,11 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.command.CommandService;
@@ -371,4 +373,20 @@ public class WorkflowExecuteRunnableTest {
         return schedulerList;
     }
 
+    @Test
+    void testTryToDispatchTaskInstance() {
+        // task instance already finished, not dispatch
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setState(TaskExecutionStatus.PAUSE);
+        Mockito.when(processInstance.isBlocked()).thenReturn(true);
+        TaskExecuteRunnable taskExecuteRunnable = 
Mockito.mock(TaskExecuteRunnable.class);
+        workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, 
taskExecuteRunnable);
+        Mockito.verify(taskExecuteRunnable, Mockito.never()).dispatch();
+
+        // submit success should dispatch
+        taskInstance = new TaskInstance();
+        taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
+        workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, 
taskExecuteRunnable);
+        Mockito.verify(taskExecuteRunnable).dispatch();
+    }
 }

Reply via email to