This is an automated email from the ASF dual-hosted git repository.

wenhemin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6e453de  [Feature] workflow level task dry run (#6104)
6e453de is described below

commit 6e453de241a6abb34c77462c9e778bf6f4013758
Author: Shukun Zhang <[email protected]>
AuthorDate: Thu Sep 30 11:18:06 2021 +0800

    [Feature] workflow level task dry run (#6104)
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
    
    * feat workflow level task dry run
---
 .../api/controller/ExecutorController.java         |  5 ++--
 .../api/service/ExecutorService.java               |  3 ++-
 .../api/service/impl/ExecutorServiceImpl.java      |  8 ++++---
 .../api/service/ExecutorServiceTest.java           | 14 +++++------
 .../apache/dolphinscheduler/common/Constants.java  |  7 ++++++
 .../dolphinscheduler/dao/entity/Command.java       | 20 +++++++++++++++-
 .../dolphinscheduler/dao/entity/ErrorCommand.java  | 17 ++++++++++++-
 .../dao/entity/ProcessInstance.java                | 16 +++++++++++++
 .../dolphinscheduler/dao/entity/TaskInstance.java  | 14 +++++++++++
 .../dolphinscheduler/dao/mapper/CommandMapper.xml  |  2 +-
 .../dao/mapper/ProcessInstanceMapper.xml           |  4 ++--
 .../dao/mapper/TaskInstanceMapper.xml              |  4 ++--
 .../master/runner/WorkflowExecuteThread.java       |  3 +++
 .../server/worker/runner/TaskExecuteThread.java    | 28 +++++++++++++++-------
 .../service/process/ProcessService.java            |  7 ++++--
 .../service/process/ProcessServiceTest.java        |  1 +
 sql/dolphinscheduler_mysql.sql                     |  4 ++++
 sql/dolphinscheduler_postgre.sql                   |  4 ++++
 18 files changed, 130 insertions(+), 31 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 93a31f2..db28f0b 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -123,7 +123,8 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = 
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
                                        @RequestParam(value = "timeout", 
required = false) Integer timeout,
                                        @RequestParam(value = "startParams", 
required = false) String startParams,
-                                       @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber) {
+                                       @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber,
+                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun) {
 
         if (timeout == null) {
             timeout = Constants.MAX_TASK_TIMEOUT;
@@ -133,7 +134,7 @@ public class ExecutorController extends BaseController {
             startParamMap = JSONUtils.toMap(startParams);
         }
         Map<String, Object> result = 
execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, 
scheduleTime, execType, failureStrategy,
-            startNodeList, taskDependType, warningType, warningGroupId, 
runMode, processInstancePriority, workerGroup, environmentCode,timeout, 
startParamMap, expectedParallelismNumber);
+            startNodeList, taskDependType, warningType, warningGroupId, 
runMode, processInstancePriority, workerGroup, environmentCode,timeout, 
startParamMap, expectedParallelismNumber, dryRun);
         return returnDataList(result);
     }
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index e868815..72d1892 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -62,7 +62,8 @@ public interface ExecutorService {
                                             TaskDependType taskDependType, 
WarningType warningType, int warningGroupId,
                                             RunMode runMode,
                                             Priority processInstancePriority, 
String workerGroup, Long environmentCode, Integer timeout,
-                                            Map<String, String> startParams, 
Integer expectedParallelismNumber);
+                                            Map<String, String> startParams, 
Integer expectedParallelismNumber,
+                                            int dryRun);
 
     /**
      * check whether the process definition can be executed
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index a213573..b910c4b 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -133,7 +133,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                                                    TaskDependType 
taskDependType, WarningType warningType, int warningGroupId,
                                                    RunMode runMode,
                                                    Priority 
processInstancePriority, String workerGroup, Long environmentCode,Integer 
timeout,
-                                                   Map<String, String> 
startParams, Integer expectedParallelismNumber) {
+                                                   Map<String, String> 
startParams, Integer expectedParallelismNumber,
+                                                   int dryRun) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -170,7 +171,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
          */
         int create = this.createCommand(commandType, 
processDefinition.getCode(),
                 taskDependType, failureStrategy, startNodeList, cronTime, 
warningType, loginUser.getId(),
-                warningGroupId, runMode, processInstancePriority, workerGroup, 
environmentCode, startParams, expectedParallelismNumber);
+                warningGroupId, runMode, processInstancePriority, workerGroup, 
environmentCode, startParams, expectedParallelismNumber, dryRun);
 
         if (create > 0) {
             processDefinition.setWarningGroupId(warningGroupId);
@@ -507,7 +508,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                               String startNodeList, String schedule, 
WarningType warningType,
                               int executorId, int warningGroupId,
                               RunMode runMode, Priority 
processInstancePriority, String workerGroup, Long environmentCode,
-                              Map<String, String> startParams, Integer 
expectedParallelismNumber) {
+                              Map<String, String> startParams, Integer 
expectedParallelismNumber, int dryRun) {
 
         /**
          * instantiate command schedule instance
@@ -543,6 +544,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         command.setProcessInstancePriority(processInstancePriority);
         command.setWorkerGroup(workerGroup);
         command.setEnvironmentCode(environmentCode);
+        command.setDryRun(dryRun);
 
         Date start = null;
         Date end = null;
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 01e81ad..e308f58 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -158,7 +158,7 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 
0);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 
0, Constants.DRY_RUN_FLAG_NO);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
 
@@ -176,7 +176,7 @@ public class ExecutorServiceTest {
                 null, "n1,n2",
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0, Constants.DRY_RUN_FLAG_NO);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
 
@@ -194,7 +194,7 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO);
         Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, 
result.get(Constants.STATUS));
         verify(processService, times(0)).createCommand(any(Command.class));
     }
@@ -211,7 +211,7 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(1)).createCommand(any(Command.class));
     }
@@ -228,7 +228,7 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 
0, Constants.DRY_RUN_FLAG_NO);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(31)).createCommand(any(Command.class));
 
@@ -246,7 +246,7 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
15);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
15, Constants.DRY_RUN_FLAG_NO);
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         verify(processService, times(15)).createCommand(any(Command.class));
 
@@ -261,7 +261,7 @@ public class ExecutorServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 
0, Constants.DRY_RUN_FLAG_NO);
         Assert.assertEquals(result.get(Constants.STATUS), 
Status.MASTER_NOT_EXISTS);
 
     }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 417ce59..2d69504 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -1093,4 +1093,11 @@ public final class Constants {
     public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName";
     public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId";
     public static final String TASK_DEPENDENCE_DEFINITION_NAME = 
"definitionName";
+
+    /**
+     * dry run flag
+     */
+    public static final int DRY_RUN_FLAG_NO = 0;
+    public static final int DRY_RUN_FLAG_YES = 1;
+
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index e3088d0..b1ed217 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -126,6 +126,12 @@ public class Command {
     @TableField("environment_code")
     private Long environmentCode;
 
+    /**
+     * dry run flag
+     */
+    @TableField("dry_run")
+    private int dryRun;
+
     public Command() {
         this.taskDependType = TaskDependType.TASK_POST;
         this.failureStrategy = FailureStrategy.CONTINUE;
@@ -145,7 +151,8 @@ public class Command {
             Date scheduleTime,
             String workerGroup,
             Long environmentCode,
-            Priority processInstancePriority) {
+            Priority processInstancePriority,
+            int dryRun) {
         this.commandType = commandType;
         this.executorId = executorId;
         this.processDefinitionCode = processDefinitionCode;
@@ -160,6 +167,7 @@ public class Command {
         this.workerGroup = workerGroup;
         this.environmentCode = environmentCode;
         this.processInstancePriority = processInstancePriority;
+        this.dryRun = dryRun;
     }
 
     public TaskDependType getTaskDependType() {
@@ -282,6 +290,14 @@ public class Command {
         this.environmentCode = environmentCode;
     }
 
+    public int getDryRun() {
+        return dryRun;
+    }
+
+    public void setDryRun(int dryRun) {
+        this.dryRun = dryRun;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -358,6 +374,7 @@ public class Command {
         result = 31 * result + (updateTime != null ? updateTime.hashCode() : 
0);
         result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 
0);
         result = 31 * result + (environmentCode != null ? 
environmentCode.hashCode() : 0);
+        result = 31 * result + dryRun;
         return result;
     }
 
@@ -379,6 +396,7 @@ public class Command {
                 + ", updateTime=" + updateTime
                 + ", workerGroup='" + workerGroup + '\''
                 + ", environmentCode='" + environmentCode + '\''
+                + ", dryRun='" + dryRun + '\''
                 + '}';
     }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
index d03570d..20df39b 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
@@ -120,7 +120,12 @@ public class ErrorCommand {
      */
     private Long environmentCode;
 
-    public ErrorCommand(){}
+    /**
+     * dry run flag
+     */
+    private int dryRun;
+
+    public ErrorCommand() {}
 
     public ErrorCommand(Command command, String message) {
         this.id = command.getId();
@@ -138,6 +143,7 @@ public class ErrorCommand {
         this.environmentCode = command.getEnvironmentCode();
         this.processInstancePriority = command.getProcessInstancePriority();
         this.message = message;
+        this.dryRun = command.getDryRun();
     }
 
     public TaskDependType getTaskDependType() {
@@ -268,6 +274,14 @@ public class ErrorCommand {
         this.environmentCode = environmentCode;
     }
 
+    public int getDryRun() {
+        return dryRun;
+    }
+
+    public void setDryRun(int dryRun) {
+        this.dryRun = dryRun;
+    }
+
     @Override
     public String toString() {
         return "ErrorCommand{"
@@ -287,6 +301,7 @@ public class ErrorCommand {
                 + ", message='" + message + '\''
                 + ", workerGroup='" + workerGroup + '\''
                 + ", environmentCode='" + environmentCode + '\''
+                + ", dryRun='" + dryRun + '\''
                 + '}';
     }
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 693f019..18c386b 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -239,6 +239,11 @@ public class ProcessInstance {
      */
     private String varPool;
 
+    /**
+     * dry run flag
+     */
+    private int dryRun;
+
     public ProcessInstance() {
 
     }
@@ -503,6 +508,14 @@ public class ProcessInstance {
         this.environmentCode = environmentCode;
     }
 
+    public int getDryRun() {
+        return dryRun;
+    }
+
+    public void setDryRun(int dryRun) {
+        this.dryRun = dryRun;
+    }
+
     /**
      * add command to history
      *
@@ -668,6 +681,9 @@ public class ProcessInstance {
             + ", processDefinitionVersion='"
             + processDefinitionVersion
             + '\''
+            + ", dryRun='"
+            + dryRun
+            + '\''
             + '}';
     }
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 7948df4..ac18975 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -261,6 +261,11 @@ public class TaskInstance implements Serializable {
      */
     private String taskParams;
 
+    /**
+     * dry run flag
+     */
+    private int dryRun;
+
     public void init(String host, Date startTime, String executePath) {
         this.host = host;
         this.startTime = startTime;
@@ -529,6 +534,14 @@ public class TaskInstance implements Serializable {
         this.executorName = executorName;
     }
 
+    public int getDryRun() {
+        return dryRun;
+    }
+
+    public void setDryRun(int dryRun) {
+        this.dryRun = dryRun;
+    }
+
     public boolean isTaskComplete() {
 
         return this.getState().typeIsPause()
@@ -654,6 +667,7 @@ public class TaskInstance implements Serializable {
                 + ", executorId=" + executorId
                 + ", executorName='" + executorName + '\''
                 + ", delayTime=" + delayTime
+                + ", dryRun=" + dryRun
                 + '}';
     }
 
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index d0e9141..ab70336 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -21,7 +21,7 @@
     <select id="getOneToRun" 
resultType="org.apache.dolphinscheduler.dao.entity.Command">
         select cmd.id, cmd.command_type, cmd.process_definition_code, 
cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
         cmd.warning_type, cmd.warning_group_id, cmd.schedule_time, 
cmd.start_time, cmd.executor_id, cmd.update_time,
-        cmd.process_instance_priority, cmd.worker_group, cmd.environment_code
+        cmd.process_instance_priority, cmd.worker_group, cmd.environment_code, 
cmd.dry_run
         from t_ds_command cmd
         join t_ds_process_definition definition on cmd.process_definition_code 
= definition.code
         where definition.release_state = 1 AND definition.flag = 1
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f1b074d..08db3af 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -23,7 +23,7 @@
         command_type, command_param, task_depend_type, max_try_times, 
failure_strategy, warning_type,
         warning_group_id, schedule_time, command_start_time, global_params, 
flag,
         update_time, is_sub_process, executor_id, history_cmd,
-        process_instance_priority, worker_group,environment_code, timeout, 
tenant_id, var_pool
+        process_instance_priority, worker_group,environment_code, timeout, 
tenant_id, var_pool, dry_run
     </sql>
     <select id="queryDetailById" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
@@ -90,7 +90,7 @@
     <select id="queryProcessInstanceListPaging" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select instance.id, instance.command_type, instance.executor_id, 
instance.process_definition_version,
         instance.process_definition_code, instance.name, instance.state, 
instance.schedule_time, instance.start_time,
-        instance.end_time, instance.run_times, instance.recovery, instance.host
+        instance.end_time, instance.run_times, instance.recovery, 
instance.host, instance.dry_run
         from t_ds_process_instance instance
         join t_ds_process_definition define ON 
instance.process_definition_code = define.code
         where instance.is_sub_process=0
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index dab824d..0964d1c 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,13 +22,13 @@
         id, name, task_type, process_instance_id, task_code, 
task_definition_version, state, submit_time,
         start_time, end_time, host, execute_path, log_path, alert_flag, 
retry_times, pid, app_link,
         flag, retry_interval, max_retry_times, task_instance_priority, 
worker_group,environment_code , executor_id,
-        first_submit_time, delay_time, task_params, var_pool
+        first_submit_time, delay_time, task_params, var_pool, dry_run
     </sql>
     <sql id="baseSqlV2">
         ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, 
${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, 
${alias}.submit_time,
         ${alias}.start_time, ${alias}.end_time, ${alias}.host, 
${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, 
${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
         ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, 
${alias}.task_instance_priority, 
${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
-        ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, 
${alias}.var_pool
+        ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, 
${alias}.var_pool, ${alias}.dry_run
     </sql>
     <update id="setFailoverByHostAndStateArray">
         update t_ds_task_instance
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 9de1a0b..eae6abe 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -702,6 +702,9 @@ public class WorkflowExecuteThread implements Runnable {
             // task instance flag
             taskInstance.setFlag(Flag.YES);
 
+            // task dry run flag
+            taskInstance.setDryRun(processInstance.getDryRun());
+
             // task instance retry times
             taskInstance.setRetryTimes(0);
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 04e4749..eaa7cc4 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -167,10 +167,14 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
             }
             logger.info("the task begins to execute. task instance id: {}", 
taskExecutionContext.getTaskInstanceId());
 
+            TaskInstance taskInstance = 
processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
+            int dryRun = taskInstance.getDryRun();
             // copy hdfs/minio file to local
-            downloadResource(taskExecutionContext.getExecutePath(),
-                    taskExecutionContext.getResources(),
-                    logger);
+            if (dryRun == Constants.DRY_RUN_FLAG_NO) {
+                downloadResource(taskExecutionContext.getExecutePath(),
+                        taskExecutionContext.getResources(),
+                        logger);
+            }
 
             taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
             taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@@ -198,14 +202,20 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
             this.task.init();
             //init varPool
             
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
-            // task handle
-            this.task.handle();
 
-            // task result process
-            if (this.task.getNeedAlert()) {
-                sendAlert(this.task.getTaskAlertInfo());
+            if (dryRun == Constants.DRY_RUN_FLAG_NO) {
+                // task handle
+                this.task.handle();
+
+                // task result process
+                if (this.task.getNeedAlert()) {
+                    sendAlert(this.task.getTaskAlertInfo());
+                }
+                responseCommand.setStatus(this.task.getExitStatus().getCode());
+            } else {
+                responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
+                task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
             }
-            responseCommand.setStatus(this.task.getExitStatus().getCode());
             responseCommand.setEndTime(new Date());
             responseCommand.setProcessId(this.task.getProcessId());
             responseCommand.setAppIds(this.task.getAppIds());
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a5d1329..a36cadc 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -552,7 +552,8 @@ public class ProcessService {
                 processInstance.getScheduleTime(),
                 processInstance.getWorkerGroup(),
                 processInstance.getEnvironmentCode(),
-                processInstance.getProcessInstancePriority()
+                processInstance.getProcessInstancePriority(),
+                processInstance.getDryRun()
             );
             saveCommand(command);
             return;
@@ -632,6 +633,7 @@ public class ProcessService {
         processInstance.setWarningType(warningType);
         Integer warningGroupId = command.getWarningGroupId() == null ? 0 : 
command.getWarningGroupId();
         processInstance.setWarningGroupId(warningGroupId);
+        processInstance.setDryRun(command.getDryRun());
 
         // schedule time
         Date scheduleTime = getScheduleTime(command, cmdParam);
@@ -1292,7 +1294,8 @@ public class ProcessService {
             parentProcessInstance.getScheduleTime(),
             task.getWorkerGroup(),
             task.getEnvironmentCode(),
-            parentProcessInstance.getProcessInstancePriority()
+            parentProcessInstance.getProcessInstancePriority(),
+            parentProcessInstance.getDryRun()
         );
     }
 
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 69b708a..e13558d 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -296,6 +296,7 @@ public class ProcessServiceTest {
         commandParams.put(CMD_PARAM_START_PARAMS, 
JSONUtils.toJsonString(startParams));
         command5.setCommandParam(JSONUtils.toJsonString(commandParams));
         command5.setCommandType(CommandType.START_PROCESS);
+        command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
         ProcessInstance processInstance1 = 
processService.handleCommand(logger, host, validThreadNum, command5);
         
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
     }
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 014a94a..f1c7f56 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -332,6 +332,7 @@ CREATE TABLE `t_ds_command` (
   `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance 
priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
   `worker_group` varchar(64)  COMMENT 'worker group',
   `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
+  `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag:0 normal, 1 dry run',
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
@@ -381,6 +382,7 @@ CREATE TABLE `t_ds_error_command` (
   `worker_group` varchar(64)  COMMENT 'worker group',
   `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
   `message` text COMMENT 'message',
+  `dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
   PRIMARY KEY (`id`) USING BTREE
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
 
@@ -586,6 +588,7 @@ CREATE TABLE `t_ds_process_instance` (
   `timeout` int(11) DEFAULT '0' COMMENT 'time out',
   `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
   `var_pool` longtext COMMENT 'var_pool',
+  `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag: 0 normal, 1 dry run ',
   PRIMARY KEY (`id`),
   KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
   KEY `start_time_index` (`start_time`) USING BTREE
@@ -822,6 +825,7 @@ CREATE TABLE `t_ds_task_instance` (
   `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
   `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
   `var_pool` longtext COMMENT 'var_pool',
+  `dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
   PRIMARY KEY (`id`),
   KEY `process_instance_id` (`process_instance_id`) USING BTREE,
   CONSTRAINT `foreign_key_instance_id` FOREIGN KEY (`process_instance_id`) 
REFERENCES `t_ds_process_instance` (`id`) ON DELETE CASCADE
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 1c6694c..3054f19 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -254,6 +254,7 @@ CREATE TABLE t_ds_command (
   process_instance_priority int DEFAULT NULL ,
   worker_group varchar(64),
   environment_code bigint DEFAULT '-1',
+  dry_run int DEFAULT '0' ,
   PRIMARY KEY (id)
 ) ;
 
@@ -297,6 +298,7 @@ CREATE TABLE t_ds_error_command (
   worker_group varchar(64),
   environment_code bigint DEFAULT '-1',
   message text ,
+  dry_ru int DEFAULT '0' ,
   PRIMARY KEY (id)
 );
 --
@@ -490,6 +492,7 @@ CREATE TABLE t_ds_process_instance (
   timeout int DEFAULT '0' ,
   tenant_id int NOT NULL DEFAULT '-1' ,
   var_pool text ,
+  dry_run int DEFAULT '0' ,
   PRIMARY KEY (id)
 ) ;
 
@@ -703,6 +706,7 @@ CREATE TABLE t_ds_task_instance (
   first_submit_time timestamp DEFAULT NULL ,
   delay_time int DEFAULT '0' ,
   var_pool text ,
+  dry_run int DEFAULT '0' ,
   PRIMARY KEY (id),
   CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id) 
REFERENCES t_ds_process_instance(id) ON DELETE CASCADE
 ) ;

Reply via email to