This is an automated email from the ASF dual-hosted git repository.
wenhemin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6e453de [Feature] workflow level task dry run (#6104)
6e453de is described below
commit 6e453de241a6abb34c77462c9e778bf6f4013758
Author: Shukun Zhang <[email protected]>
AuthorDate: Thu Sep 30 11:18:06 2021 +0800
[Feature] workflow level task dry run (#6104)
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
* feat workflow level task dry run
---
.../api/controller/ExecutorController.java | 5 ++--
.../api/service/ExecutorService.java | 3 ++-
.../api/service/impl/ExecutorServiceImpl.java | 8 ++++---
.../api/service/ExecutorServiceTest.java | 14 +++++------
.../apache/dolphinscheduler/common/Constants.java | 7 ++++++
.../dolphinscheduler/dao/entity/Command.java | 20 +++++++++++++++-
.../dolphinscheduler/dao/entity/ErrorCommand.java | 17 ++++++++++++-
.../dao/entity/ProcessInstance.java | 16 +++++++++++++
.../dolphinscheduler/dao/entity/TaskInstance.java | 14 +++++++++++
.../dolphinscheduler/dao/mapper/CommandMapper.xml | 2 +-
.../dao/mapper/ProcessInstanceMapper.xml | 4 ++--
.../dao/mapper/TaskInstanceMapper.xml | 4 ++--
.../master/runner/WorkflowExecuteThread.java | 3 +++
.../server/worker/runner/TaskExecuteThread.java | 28 +++++++++++++++-------
.../service/process/ProcessService.java | 7 ++++--
.../service/process/ProcessServiceTest.java | 1 +
sql/dolphinscheduler_mysql.sql | 4 ++++
sql/dolphinscheduler_postgre.sql | 4 ++++
18 files changed, 130 insertions(+), 31 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 93a31f2..db28f0b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -123,7 +123,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value =
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "timeout",
required = false) Integer timeout,
@RequestParam(value = "startParams",
required = false) String startParams,
- @RequestParam(value =
"expectedParallelismNumber", required = false) Integer
expectedParallelismNumber) {
+ @RequestParam(value =
"expectedParallelismNumber", required = false) Integer
expectedParallelismNumber,
+ @RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@@ -133,7 +134,7 @@ public class ExecutorController extends BaseController {
startParamMap = JSONUtils.toMap(startParams);
}
Map<String, Object> result =
execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
- startNodeList, taskDependType, warningType, warningGroupId,
runMode, processInstancePriority, workerGroup, environmentCode,timeout,
startParamMap, expectedParallelismNumber);
+ startNodeList, taskDependType, warningType, warningGroupId,
runMode, processInstancePriority, workerGroup, environmentCode,timeout,
startParamMap, expectedParallelismNumber, dryRun);
return returnDataList(result);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index e868815..72d1892 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -62,7 +62,8 @@ public interface ExecutorService {
TaskDependType taskDependType,
WarningType warningType, int warningGroupId,
RunMode runMode,
Priority processInstancePriority,
String workerGroup, Long environmentCode, Integer timeout,
- Map<String, String> startParams,
Integer expectedParallelismNumber);
+ Map<String, String> startParams,
Integer expectedParallelismNumber,
+ int dryRun);
/**
* check whether the process definition can be executed
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index a213573..b910c4b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -133,7 +133,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
TaskDependType
taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
Priority
processInstancePriority, String workerGroup, Long environmentCode,Integer
timeout,
- Map<String, String>
startParams, Integer expectedParallelismNumber) {
+ Map<String, String>
startParams, Integer expectedParallelismNumber,
+ int dryRun) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -170,7 +171,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
*/
int create = this.createCommand(commandType,
processDefinition.getCode(),
taskDependType, failureStrategy, startNodeList, cronTime,
warningType, loginUser.getId(),
- warningGroupId, runMode, processInstancePriority, workerGroup,
environmentCode, startParams, expectedParallelismNumber);
+ warningGroupId, runMode, processInstancePriority, workerGroup,
environmentCode, startParams, expectedParallelismNumber, dryRun);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
@@ -507,7 +508,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
String startNodeList, String schedule,
WarningType warningType,
int executorId, int warningGroupId,
RunMode runMode, Priority
processInstancePriority, String workerGroup, Long environmentCode,
- Map<String, String> startParams, Integer
expectedParallelismNumber) {
+ Map<String, String> startParams, Integer
expectedParallelismNumber, int dryRun) {
/**
* instantiate command schedule instance
@@ -543,6 +544,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
command.setProcessInstancePriority(processInstancePriority);
command.setWorkerGroup(workerGroup);
command.setEnvironmentCode(environmentCode);
+ command.setDryRun(dryRun);
Date start = null;
Date end = null;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 01e81ad..e308f58 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -158,7 +158,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null,
0);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null,
0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@@ -176,7 +176,7 @@ public class ExecutorServiceTest {
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@@ -194,7 +194,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR,
result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
}
@@ -211,7 +211,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
@@ -228,7 +228,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null,
0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
@@ -246,7 +246,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
15);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
15, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
@@ -261,7 +261,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0);
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null,
0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(result.get(Constants.STATUS),
Status.MASTER_NOT_EXISTS);
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 417ce59..2d69504 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -1093,4 +1093,11 @@ public final class Constants {
public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName";
public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId";
public static final String TASK_DEPENDENCE_DEFINITION_NAME =
"definitionName";
+
+ /**
+ * dry run flag
+ */
+ public static final int DRY_RUN_FLAG_NO = 0;
+ public static final int DRY_RUN_FLAG_YES = 1;
+
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
index e3088d0..b1ed217 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
@@ -126,6 +126,12 @@ public class Command {
@TableField("environment_code")
private Long environmentCode;
+ /**
+ * dry run flag
+ */
+ @TableField("dry_run")
+ private int dryRun;
+
public Command() {
this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE;
@@ -145,7 +151,8 @@ public class Command {
Date scheduleTime,
String workerGroup,
Long environmentCode,
- Priority processInstancePriority) {
+ Priority processInstancePriority,
+ int dryRun) {
this.commandType = commandType;
this.executorId = executorId;
this.processDefinitionCode = processDefinitionCode;
@@ -160,6 +167,7 @@ public class Command {
this.workerGroup = workerGroup;
this.environmentCode = environmentCode;
this.processInstancePriority = processInstancePriority;
+ this.dryRun = dryRun;
}
public TaskDependType getTaskDependType() {
@@ -282,6 +290,14 @@ public class Command {
this.environmentCode = environmentCode;
}
+ public int getDryRun() {
+ return dryRun;
+ }
+
+ public void setDryRun(int dryRun) {
+ this.dryRun = dryRun;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -358,6 +374,7 @@ public class Command {
result = 31 * result + (updateTime != null ? updateTime.hashCode() :
0);
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() :
0);
result = 31 * result + (environmentCode != null ?
environmentCode.hashCode() : 0);
+ result = 31 * result + dryRun;
return result;
}
@@ -379,6 +396,7 @@ public class Command {
+ ", updateTime=" + updateTime
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentCode='" + environmentCode + '\''
+ + ", dryRun='" + dryRun + '\''
+ '}';
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
index d03570d..20df39b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
@@ -120,7 +120,12 @@ public class ErrorCommand {
*/
private Long environmentCode;
- public ErrorCommand(){}
+ /**
+ * dry run flag
+ */
+ private int dryRun;
+
+ public ErrorCommand() {}
public ErrorCommand(Command command, String message) {
this.id = command.getId();
@@ -138,6 +143,7 @@ public class ErrorCommand {
this.environmentCode = command.getEnvironmentCode();
this.processInstancePriority = command.getProcessInstancePriority();
this.message = message;
+ this.dryRun = command.getDryRun();
}
public TaskDependType getTaskDependType() {
@@ -268,6 +274,14 @@ public class ErrorCommand {
this.environmentCode = environmentCode;
}
+ public int getDryRun() {
+ return dryRun;
+ }
+
+ public void setDryRun(int dryRun) {
+ this.dryRun = dryRun;
+ }
+
@Override
public String toString() {
return "ErrorCommand{"
@@ -287,6 +301,7 @@ public class ErrorCommand {
+ ", message='" + message + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentCode='" + environmentCode + '\''
+ + ", dryRun='" + dryRun + '\''
+ '}';
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 693f019..18c386b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -239,6 +239,11 @@ public class ProcessInstance {
*/
private String varPool;
+ /**
+ * dry run flag
+ */
+ private int dryRun;
+
public ProcessInstance() {
}
@@ -503,6 +508,14 @@ public class ProcessInstance {
this.environmentCode = environmentCode;
}
+ public int getDryRun() {
+ return dryRun;
+ }
+
+ public void setDryRun(int dryRun) {
+ this.dryRun = dryRun;
+ }
+
/**
* add command to history
*
@@ -668,6 +681,9 @@ public class ProcessInstance {
+ ", processDefinitionVersion='"
+ processDefinitionVersion
+ '\''
+ + ", dryRun='"
+ + dryRun
+ + '\''
+ '}';
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 7948df4..ac18975 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -261,6 +261,11 @@ public class TaskInstance implements Serializable {
*/
private String taskParams;
+ /**
+ * dry run flag
+ */
+ private int dryRun;
+
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
@@ -529,6 +534,14 @@ public class TaskInstance implements Serializable {
this.executorName = executorName;
}
+ public int getDryRun() {
+ return dryRun;
+ }
+
+ public void setDryRun(int dryRun) {
+ this.dryRun = dryRun;
+ }
+
public boolean isTaskComplete() {
return this.getState().typeIsPause()
@@ -654,6 +667,7 @@ public class TaskInstance implements Serializable {
+ ", executorId=" + executorId
+ ", executorName='" + executorName + '\''
+ ", delayTime=" + delayTime
+ + ", dryRun=" + dryRun
+ '}';
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index d0e9141..ab70336 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -21,7 +21,7 @@
<select id="getOneToRun"
resultType="org.apache.dolphinscheduler.dao.entity.Command">
select cmd.id, cmd.command_type, cmd.process_definition_code,
cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
cmd.warning_type, cmd.warning_group_id, cmd.schedule_time,
cmd.start_time, cmd.executor_id, cmd.update_time,
- cmd.process_instance_priority, cmd.worker_group, cmd.environment_code
+ cmd.process_instance_priority, cmd.worker_group, cmd.environment_code,
cmd.dry_run
from t_ds_command cmd
join t_ds_process_definition definition on cmd.process_definition_code
= definition.code
where definition.release_state = 1 AND definition.flag = 1
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index f1b074d..08db3af 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -23,7 +23,7 @@
command_type, command_param, task_depend_type, max_try_times,
failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params,
flag,
update_time, is_sub_process, executor_id, history_cmd,
- process_instance_priority, worker_group,environment_code, timeout,
tenant_id, var_pool
+ process_instance_priority, worker_group,environment_code, timeout,
tenant_id, var_pool, dry_run
</sql>
<select id="queryDetailById"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@@ -90,7 +90,7 @@
<select id="queryProcessInstanceListPaging"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id,
instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state,
instance.schedule_time, instance.start_time,
- instance.end_time, instance.run_times, instance.recovery, instance.host
+ instance.end_time, instance.run_times, instance.recovery,
instance.host, instance.dry_run
from t_ds_process_instance instance
join t_ds_process_definition define ON
instance.process_definition_code = define.code
where instance.is_sub_process=0
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index dab824d..0964d1c 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -22,13 +22,13 @@
id, name, task_type, process_instance_id, task_code,
task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag,
retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority,
worker_group,environment_code , executor_id,
- first_submit_time, delay_time, task_params, var_pool
+ first_submit_time, delay_time, task_params, var_pool, dry_run
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code,
${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state,
${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host,
${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag,
${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times,
${alias}.task_instance_priority,
${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
- ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params,
${alias}.var_pool
+ ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params,
${alias}.var_pool, ${alias}.dry_run
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 9de1a0b..eae6abe 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -702,6 +702,9 @@ public class WorkflowExecuteThread implements Runnable {
// task instance flag
taskInstance.setFlag(Flag.YES);
+ // task dry run flag
+ taskInstance.setDryRun(processInstance.getDryRun());
+
// task instance retry times
taskInstance.setRetryTimes(0);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 04e4749..eaa7cc4 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -167,10 +167,14 @@ public class TaskExecuteThread implements Runnable,
Delayed {
}
logger.info("the task begins to execute. task instance id: {}",
taskExecutionContext.getTaskInstanceId());
+ TaskInstance taskInstance =
processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
+ int dryRun = taskInstance.getDryRun();
// copy hdfs/minio file to local
- downloadResource(taskExecutionContext.getExecutePath(),
- taskExecutionContext.getResources(),
- logger);
+ if (dryRun == Constants.DRY_RUN_FLAG_NO) {
+ downloadResource(taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getResources(),
+ logger);
+ }
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@@ -198,14 +202,20 @@ public class TaskExecuteThread implements Runnable,
Delayed {
this.task.init();
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
- // task handle
- this.task.handle();
- // task result process
- if (this.task.getNeedAlert()) {
- sendAlert(this.task.getTaskAlertInfo());
+ if (dryRun == Constants.DRY_RUN_FLAG_NO) {
+ // task handle
+ this.task.handle();
+
+ // task result process
+ if (this.task.getNeedAlert()) {
+ sendAlert(this.task.getTaskAlertInfo());
+ }
+ responseCommand.setStatus(this.task.getExitStatus().getCode());
+ } else {
+ responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
+ task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
}
- responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a5d1329..a36cadc 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -552,7 +552,8 @@ public class ProcessService {
processInstance.getScheduleTime(),
processInstance.getWorkerGroup(),
processInstance.getEnvironmentCode(),
- processInstance.getProcessInstancePriority()
+ processInstance.getProcessInstancePriority(),
+ processInstance.getDryRun()
);
saveCommand(command);
return;
@@ -632,6 +633,7 @@ public class ProcessService {
processInstance.setWarningType(warningType);
Integer warningGroupId = command.getWarningGroupId() == null ? 0 :
command.getWarningGroupId();
processInstance.setWarningGroupId(warningGroupId);
+ processInstance.setDryRun(command.getDryRun());
// schedule time
Date scheduleTime = getScheduleTime(command, cmdParam);
@@ -1292,7 +1294,8 @@ public class ProcessService {
parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
task.getEnvironmentCode(),
- parentProcessInstance.getProcessInstancePriority()
+ parentProcessInstance.getProcessInstancePriority(),
+ parentProcessInstance.getDryRun()
);
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 69b708a..e13558d 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -296,6 +296,7 @@ public class ProcessServiceTest {
commandParams.put(CMD_PARAM_START_PARAMS,
JSONUtils.toJsonString(startParams));
command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS);
+ command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
ProcessInstance processInstance1 =
processService.handleCommand(logger, host, validThreadNum, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
}
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 014a94a..f1c7f56 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -332,6 +332,7 @@ CREATE TABLE `t_ds_command` (
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance
priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) COMMENT 'worker group',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
+ `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flagļ¼0 normal, 1 dry run',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@@ -381,6 +382,7 @@ CREATE TABLE `t_ds_error_command` (
`worker_group` varchar(64) COMMENT 'worker group',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`message` text COMMENT 'message',
+ `dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
@@ -586,6 +588,7 @@ CREATE TABLE `t_ds_process_instance` (
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`var_pool` longtext COMMENT 'var_pool',
+ `dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag: 0 normal, 1 dry run ',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE
@@ -822,6 +825,7 @@ CREATE TABLE `t_ds_task_instance` (
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
`var_pool` longtext COMMENT 'var_pool',
+ `dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
CONSTRAINT `foreign_key_instance_id` FOREIGN KEY (`process_instance_id`)
REFERENCES `t_ds_process_instance` (`id`) ON DELETE CASCADE
diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql
index 1c6694c..3054f19 100644
--- a/sql/dolphinscheduler_postgre.sql
+++ b/sql/dolphinscheduler_postgre.sql
@@ -254,6 +254,7 @@ CREATE TABLE t_ds_command (
process_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
environment_code bigint DEFAULT '-1',
+ dry_run int DEFAULT '0' ,
PRIMARY KEY (id)
) ;
@@ -297,6 +298,7 @@ CREATE TABLE t_ds_error_command (
worker_group varchar(64),
environment_code bigint DEFAULT '-1',
message text ,
+ dry_ru int DEFAULT '0' ,
PRIMARY KEY (id)
);
--
@@ -490,6 +492,7 @@ CREATE TABLE t_ds_process_instance (
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
var_pool text ,
+ dry_run int DEFAULT '0' ,
PRIMARY KEY (id)
) ;
@@ -703,6 +706,7 @@ CREATE TABLE t_ds_task_instance (
first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' ,
var_pool text ,
+ dry_run int DEFAULT '0' ,
PRIMARY KEY (id),
CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id)
REFERENCES t_ds_process_instance(id) ON DELETE CASCADE
) ;