This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7ae4fb3787 [Fix-14149][master] task finished so not dispatch (#14161)
7ae4fb3787 is described below
commit 7ae4fb378779077c75cd6411ef8e3b68f232bd84
Author: eye-gu <[email protected]>
AuthorDate: Thu Aug 31 20:08:09 2023 +0800
[Fix-14149][master] task finished so not dispatch (#14161)
---
.../master/runner/WorkflowExecuteRunnable.java | 33 +++++++++++++++++++++-
.../master/runner/WorkflowExecuteRunnableTest.java | 18 ++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 12fed3ab56..7bf2f30e8c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -82,6 +82,7 @@ import
org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -122,6 +123,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -983,7 +985,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
}
// 4. submit to dispatch queue
- taskExecuteRunnable.dispatch();
+ tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance,
taskInstance);
return true;
@@ -996,6 +998,35 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
}
}
+ /**
+ * Sometimes (such as pause), if the task instance status has already been
finished,
+ * there is no need to dispatch it
+ */
+ @VisibleForTesting
+ void tryToDispatchTaskInstance(TaskInstance taskInstance,
TaskExecuteRunnable taskExecuteRunnable) {
+ if (!taskInstance.getState().isFinished()) {
+ taskExecuteRunnable.dispatch();
+ } else {
+ if (workflowExecuteContext.getWorkflowInstance().isBlocked()) {
+ TaskStateEvent processBlockEvent = TaskStateEvent.builder()
+
.processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
+ .taskInstanceId(taskInstance.getId())
+ .status(taskInstance.getState())
+ .type(StateEventType.PROCESS_BLOCKED)
+ .build();
+ this.stateEvents.add(processBlockEvent);
+ }
+
+ TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder()
+
.processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
+ .taskInstanceId(taskInstance.getId())
+ .status(taskInstance.getState())
+ .type(StateEventType.TASK_STATE_CHANGE)
+ .build();
+ this.stateEvents.add(taskStateChangeEvent);
+ }
+ }
+
/**
* find task instance in db.
* in case submit more than one same name task in the same time.
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index da96be2066..9599478f4a 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -35,9 +35,11 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.command.CommandService;
@@ -371,4 +373,20 @@ public class WorkflowExecuteRunnableTest {
return schedulerList;
}
+ @Test
+ void testTryToDispatchTaskInstance() {
+ // task instance already finished, not dispatch
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setState(TaskExecutionStatus.PAUSE);
+ Mockito.when(processInstance.isBlocked()).thenReturn(true);
+ TaskExecuteRunnable taskExecuteRunnable =
Mockito.mock(TaskExecuteRunnable.class);
+ workflowExecuteThread.tryToDispatchTaskInstance(taskInstance,
taskExecuteRunnable);
+ Mockito.verify(taskExecuteRunnable, Mockito.never()).dispatch();
+
+ // submit success should dispatch
+ taskInstance = new TaskInstance();
+ taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
+ workflowExecuteThread.tryToDispatchTaskInstance(taskInstance,
taskExecuteRunnable);
+ Mockito.verify(taskExecuteRunnable).dispatch();
+ }
}