This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ccedd2ed95 [Improvement-16612][Master] For logical tasks on the
Master, there should be support for dry run (#16616)
ccedd2ed95 is described below
commit ccedd2ed95868b5bbb16a7451f112c6ca9669f45
Author: veli.yang <[email protected]>
AuthorDate: Sat Oct 12 14:25:56 2024 +0800
[Improvement-16612][Master] For logical tasks on the Master, there should
be support for dry run (#16616)
---
.../master/runner/execute/MasterTaskExecutor.java | 10 +++
.../master/integration/WorkflowOperator.java | 5 ++
.../integration/cases/WorkflowStartTestCase.java | 90 +++++++++++++++++++++-
3 files changed, 103 insertions(+), 2 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
index aba154fffa..d21e81fe97 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+import static
org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -103,6 +104,15 @@ public abstract class MasterTaskExecutor implements
Runnable {
TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
beforeExecute();
+ if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
+
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
+ taskExecutionContext.setEndTime(System.currentTimeMillis());
+
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
+
logicTaskInstanceExecutionEventSenderManager.successEventSender().sendMessage(taskExecutionContext);
+ log.info(
+ "The current execute mode is dry run, will stop the
logic task and set the taskInstance status to success");
+ return;
+ }
TaskInstanceLogHeader.printExecuteTaskHeader();
executeTask();
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
index 651026e3ac..3c30fe947c 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.integration;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -61,6 +62,7 @@ public class WorkflowOperator {
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
+ .dryRun(workflowTriggerDTO.dryRun)
.build();
final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -139,6 +141,9 @@ public class WorkflowOperator {
private final WorkflowDefinition workflowDefinition;
private final RunWorkflowCommandParam runWorkflowCommandParam;
+
+ @Builder.Default
+ private Flag dryRun = Flag.NO;
}
@Data
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index e441372086..f91594d7d6 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -87,12 +87,50 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
- workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
+ .matches(
+ workflowInstance ->
workflowInstance.getDryRun() == Flag.NO.getCode());
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .satisfiesExactly(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
+ });
+ });
+
+ assertThat(workflowRepository.getAll()).isEmpty();
+ }
+
+ @Test
+ @DisplayName("Test start a workflow with one fake task(A) dry run success")
+ public void testStartWorkflow_with_oneSuccessTaskDryRun() {
+ final String yaml =
"/it/start/workflow_with_one_fake_task_success.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getWorkflows().get(0);
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .dryRun(Flag.YES)
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
+ .matches(
+ workflowInstance ->
workflowInstance.getDryRun() == Flag.YES.getCode());
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
});
});
@@ -121,7 +159,9 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
.matches(
workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
- workflowInstance ->
workflowInstance.getIsSubWorkflow() == Flag.NO);
+ workflowInstance ->
workflowInstance.getIsSubWorkflow() == Flag.NO)
+ .matches(
+ workflowInstance ->
workflowInstance.getDryRun() == Flag.NO.getCode());
final List<WorkflowInstance> subWorkflowInstance =
repository.queryWorkflowInstance(context.getWorkflows().get(1));
@@ -131,6 +171,7 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
.satisfiesExactly(workflowInstance -> {
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
+
assertThat(workflowInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
});
Assertions
@@ -151,6 +192,51 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
assertThat(workflowRepository.getAll()).isEmpty();
}
+ @Test
+ @DisplayName("Test start a workflow with one sub workflow task(A) dry run,
will not execute")
+ public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
+ final String yaml =
"/it/start/workflow_with_sub_workflow_task_success.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition parentWorkflow =
context.getWorkflows().get(0);
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(parentWorkflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .dryRun(Flag.YES)
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .matches(
+ workflowInstance ->
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
+ .matches(
+ workflowInstance ->
workflowInstance.getIsSubWorkflow() == Flag.NO)
+ .matches(
+ workflowInstance ->
workflowInstance.getDryRun() == Flag.YES.getCode());
+
+ final List<WorkflowInstance> subWorkflowInstance =
+
repository.queryWorkflowInstance(context.getWorkflows().get(1));
+ Assertions
+ .assertThat(subWorkflowInstance)
+ .isEmpty();
+
+ Assertions
+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .satisfiesExactly(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
+ });
+ });
+
+ assertThat(workflowRepository.getAll()).isEmpty();
+ }
+
@Test
@DisplayName("Test start a workflow with one sub workflow task(A) failed")
public void testStartWorkflow_with_subWorkflowTask_failed() {