This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ccedd2ed95 [Improvement-16612][Master] For logical tasks on the 
Master, there should be support for dry run (#16616)
ccedd2ed95 is described below

commit ccedd2ed95868b5bbb16a7451f112c6ca9669f45
Author: veli.yang <[email protected]>
AuthorDate: Sat Oct 12 14:25:56 2024 +0800

    [Improvement-16612][Master] For logical tasks on the Master, there should 
be support for dry run (#16616)
---
 .../master/runner/execute/MasterTaskExecutor.java  | 10 +++
 .../master/integration/WorkflowOperator.java       |  5 ++
 .../integration/cases/WorkflowStartTestCase.java   | 90 +++++++++++++++++++++-
 3 files changed, 103 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
index aba154fffa..d21e81fe97 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.runner.execute;
 
 import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+import static 
org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;
 
 import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -103,6 +104,15 @@ public abstract class MasterTaskExecutor implements 
Runnable {
             TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
             beforeExecute();
 
+            if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
+                
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+                taskExecutionContext.setEndTime(System.currentTimeMillis());
+                
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
+                
logicTaskInstanceExecutionEventSenderManager.successEventSender().sendMessage(taskExecutionContext);
+                log.info(
+                        "The current execute mode is dry run, will stop the 
logic task and set the taskInstance status to success");
+                return;
+            }
             TaskInstanceLogHeader.printExecuteTaskHeader();
             executeTask();
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
index 651026e3ac..3c30fe947c 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.integration;
 
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -61,6 +62,7 @@ public class WorkflowOperator {
                 
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
                 
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
                 
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
+                .dryRun(workflowTriggerDTO.dryRun)
                 .build();
 
         final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -139,6 +141,9 @@ public class WorkflowOperator {
         private final WorkflowDefinition workflowDefinition;
 
         private final RunWorkflowCommandParam runWorkflowCommandParam;
+
+        @Builder.Default
+        private Flag dryRun = Flag.NO;
     }
 
     @Data
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index e441372086..f91594d7d6 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -87,12 +87,50 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
                     Assertions
                             
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
                             .matches(
-                                    workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+                                    workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getDryRun() == Flag.NO.getCode());
+                    Assertions
+                            .assertThat(repository.queryTaskInstance(workflow))
+                            .satisfiesExactly(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                                
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
+                            });
+                });
+
+        assertThat(workflowRepository.getAll()).isEmpty();
+    }
+
+    @Test
+    @DisplayName("Test start a workflow with one fake task(A) dry run success")
+    public void testStartWorkflow_with_oneSuccessTaskDryRun() {
+        final String yaml = 
"/it/start/workflow_with_one_fake_task_success.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getWorkflows().get(0);
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .dryRun(Flag.YES)
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getDryRun() == Flag.YES.getCode());
                     Assertions
                             .assertThat(repository.queryTaskInstance(workflow))
                             .satisfiesExactly(taskInstance -> {
                                 
assertThat(taskInstance.getName()).isEqualTo("A");
                                 
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                                
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
                             });
                 });
 
@@ -121,7 +159,9 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
                             .matches(
                                     workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
                             .matches(
-                                    workflowInstance -> 
workflowInstance.getIsSubWorkflow() == Flag.NO);
+                                    workflowInstance -> 
workflowInstance.getIsSubWorkflow() == Flag.NO)
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getDryRun() == Flag.NO.getCode());
 
                     final List<WorkflowInstance> subWorkflowInstance =
                             
repository.queryWorkflowInstance(context.getWorkflows().get(1));
@@ -131,6 +171,7 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
                             .satisfiesExactly(workflowInstance -> {
                                 
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
                                 
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
+                                
assertThat(workflowInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
                             });
 
                     Assertions
@@ -151,6 +192,51 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
         assertThat(workflowRepository.getAll()).isEmpty();
     }
 
+    @Test
+    @DisplayName("Test start a workflow with one sub workflow task(A) dry run, 
will not execute")
+    public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
+        final String yaml = 
"/it/start/workflow_with_sub_workflow_task_success.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition parentWorkflow = 
context.getWorkflows().get(0);
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(parentWorkflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .dryRun(Flag.YES)
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getIsSubWorkflow() == Flag.NO)
+                            .matches(
+                                    workflowInstance -> 
workflowInstance.getDryRun() == Flag.YES.getCode());
+
+                    final List<WorkflowInstance> subWorkflowInstance =
+                            
repository.queryWorkflowInstance(context.getWorkflows().get(1));
+                    Assertions
+                            .assertThat(subWorkflowInstance)
+                            .isEmpty();
+
+                    Assertions
+                            
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+                            .satisfiesExactly(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                                
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
+                            });
+                });
+
+        assertThat(workflowRepository.getAll()).isEmpty();
+    }
+
     @Test
     @DisplayName("Test start a workflow with one sub workflow task(A) failed")
     public void testStartWorkflow_with_subWorkflowTask_failed() {

Reply via email to