This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2284835d28 [Improvement-16907][Master] Response for stop/pasue event
when workflow instance statue is ready_stop/ready_pause (#16908)
2284835d28 is described below
commit 2284835d28f6c5e1a2c355e2316bf771ef31555b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 25 13:54:43 2024 +0800
[Improvement-16907][Master] Response for stop/pasue event when workflow
instance statue is ready_stop/ready_pause (#16908)
---
.../statemachine/AbstractWorkflowStateAction.java | 25 ++++++++++++++++++++++
.../statemachine/WorkflowReadyStopStateAction.java | 2 +-
.../statemachine/WorkflowRunningStateAction.java | 23 ++------------------
3 files changed, 28 insertions(+), 22 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
index 8b78903a5a..1cd6ad21b7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java
@@ -22,6 +22,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.WorkflowCacheRepository;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
@@ -90,6 +91,30 @@ public abstract class AbstractWorkflowStateAction implements
IWorkflowStateActio
}
}
+ protected void killActiveTask(final IWorkflowExecutionRunnable
workflowExecutionRunnable) {
+ try {
+
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
+ workflowExecutionRunnable
+ .getWorkflowExecutionGraph()
+ .getActiveTaskExecutionRunnable()
+ .forEach(ITaskExecutionRunnable::kill);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+
+ protected void pauseActiveTask(final IWorkflowExecutionRunnable
workflowExecutionRunnable) {
+ try {
+
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
+ workflowExecutionRunnable
+ .getWorkflowExecutionGraph()
+ .getActiveTaskExecutionRunnable()
+ .forEach(ITaskExecutionRunnable::pause);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
+ }
+ }
+
protected void onTaskFinish(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
final ITaskExecutionRunnable
taskExecutionRunnable) {
final IWorkflowExecutionGraph workflowExecutionGraph =
workflowExecutionRunnable.getWorkflowExecutionGraph();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
index c35d712bd3..8b1f393ffe 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java
@@ -74,7 +74,7 @@ public class WorkflowReadyStopStateAction extends
AbstractWorkflowStateAction {
public void stopEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
final WorkflowStopLifecycleEvent
workflowStopEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowStopEvent);
+ super.killActiveTask(workflowExecutionRunnable);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
index 914d050191..2ef810aba4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java
@@ -18,10 +18,8 @@
package org.apache.dolphinscheduler.server.master.engine.workflow.statemachine;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import
org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
@@ -64,15 +62,7 @@ public class WorkflowRunningStateAction extends
AbstractWorkflowStateAction {
final WorkflowPauseLifecycleEvent
workflowPauseEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.transformWorkflowInstanceState(workflowExecutionRunnable,
WorkflowExecutionStatus.READY_PAUSE);
- try {
-
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
- workflowExecutionRunnable
- .getWorkflowExecutionGraph()
- .getActiveTaskExecutionRunnable()
- .forEach(ITaskExecutionRunnable::pause);
- } finally {
- LogUtils.removeWorkflowInstanceIdMDC();
- }
+ super.pauseActiveTask(workflowExecutionRunnable);
}
@Override
@@ -87,16 +77,7 @@ public class WorkflowRunningStateAction extends
AbstractWorkflowStateAction {
final WorkflowStopLifecycleEvent
workflowStopEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.transformWorkflowInstanceState(workflowExecutionRunnable,
WorkflowExecutionStatus.READY_STOP);
- // do pause action
- try {
-
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
- workflowExecutionRunnable
- .getWorkflowExecutionGraph()
- .getActiveTaskExecutionRunnable()
- .forEach(ITaskExecutionRunnable::kill);
- } finally {
- LogUtils.removeWorkflowInstanceIdMDC();
- }
+ super.killActiveTask(workflowExecutionRunnable);
}
@Override