This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f254124 [Feature-7769][API] Add batch start process api (#7771)
f254124 is described below
commit f2541248eb85f7977f75a3be3c7419688d20d713
Author: xiangzihao <[email protected]>
AuthorDate: Fri Jan 7 12:26:06 2022 +0800
[Feature-7769][API] Add batch start process api (#7771)
* add notes
Co-authored-by: SbloodyS <[email protected]>
---
.../api/controller/ExecutorController.java | 104 ++++++++++++++++++++-
.../apache/dolphinscheduler/api/enums/Status.java | 1 +
.../src/main/resources/i18n/messages.properties | 5 +-
.../main/resources/i18n/messages_en_US.properties | 2 +
.../main/resources/i18n/messages_zh_CN.properties | 1 +
5 files changed, 109 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 3cff198..4f651c0 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -23,6 +23,7 @@ import static
org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANC
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
+import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -36,8 +37,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
-import java.util.Map;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
@@ -55,6 +54,13 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* executor controller
*/
@@ -139,6 +145,100 @@ public class ExecutorController extends BaseController {
}
/**
+ * batch execute process instance
+ * If any processDefinitionCode cannot be found, the failure information
is returned and the status is set to
+ * failed. The successful task will run normally and will not stop
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param processDefinitionCodes process definition codes
+ * @param scheduleTime schedule time
+ * @param failureStrategy failure strategy
+ * @param startNodeList start nodes list
+ * @param taskDependType task depend type
+ * @param execType execute type
+ * @param warningType warning type
+ * @param warningGroupId warning group id
+ * @param runMode run mode
+ * @param processInstancePriority process instance priority
+ * @param workerGroup worker group
+ * @param timeout timeout
+ * @param expectedParallelismNumber the expected parallelism number when
execute complement in parallel mode
+ * @return start process result code
+ */
+ @ApiOperation(value = "batchStartProcessInstance", notes =
"BATCH_RUN_PROCESS_INSTANCE_NOTES")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "processDefinitionCodes", value =
"PROCESS_DEFINITION_CODES", required = true, dataType = "String", example =
"1,2,3"),
+ @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME",
required = true, dataType = "String"),
+ @ApiImplicitParam(name = "failureStrategy", value =
"FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+ @ApiImplicitParam(name = "startNodeList", value =
"START_NODE_LIST", dataType = "String"),
+ @ApiImplicitParam(name = "taskDependType", value =
"TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+ @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE",
dataType = "CommandType"),
+ @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",
required = true, dataType = "WarningType"),
+ @ApiImplicitParam(name = "warningGroupId", value =
"WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+ @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType =
"RunMode"),
+ @ApiImplicitParam(name = "processInstancePriority", value =
"PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+ @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP",
dataType = "String", example = "default"),
+ @ApiImplicitParam(name = "environmentCode", value =
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
+ @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType =
"Int", example = "100"),
+ @ApiImplicitParam(name = "expectedParallelismNumber", value =
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+ })
+ @PostMapping(value = "batch-start-process-instance")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(START_PROCESS_INSTANCE_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value
= Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @RequestParam(value =
"processDefinitionCodes") String processDefinitionCodes,
+ @RequestParam(value = "scheduleTime",
required = false) String scheduleTime,
+ @RequestParam(value =
"failureStrategy", required = true) FailureStrategy failureStrategy,
+ @RequestParam(value = "startNodeList",
required = false) String startNodeList,
+ @RequestParam(value = "taskDependType",
required = false) TaskDependType taskDependType,
+ @RequestParam(value = "execType",
required = false) CommandType execType,
+ @RequestParam(value = "warningType",
required = true) WarningType warningType,
+ @RequestParam(value = "warningGroupId",
required = false) int warningGroupId,
+ @RequestParam(value = "runMode",
required = false) RunMode runMode,
+ @RequestParam(value =
"processInstancePriority", required = false) Priority processInstancePriority,
+ @RequestParam(value = "workerGroup",
required = false, defaultValue = "default") String workerGroup,
+ @RequestParam(value =
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
+ @RequestParam(value = "timeout",
required = false) Integer timeout,
+ @RequestParam(value = "startParams",
required = false) String startParams,
+ @RequestParam(value =
"expectedParallelismNumber", required = false) Integer
expectedParallelismNumber,
+ @RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun) {
+
+ if (timeout == null) {
+ timeout = Constants.MAX_TASK_TIMEOUT;
+ }
+
+ Map<String, String> startParamMap = null;
+ if (startParams != null) {
+ startParamMap = JSONUtils.toMap(startParams);
+ }
+
+ Map<String, Object> result = new HashMap<>();
+ List<String> processDefinitionCodeArray =
Arrays.asList(processDefinitionCodes.split(Constants.COMMA));
+ List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
+
+ processDefinitionCodeArray =
processDefinitionCodeArray.stream().distinct().collect(Collectors.toList());
+
+ for (String strProcessDefinitionCode : processDefinitionCodeArray) {
+ long processDefinitionCode =
Long.parseLong(strProcessDefinitionCode);
+ result = execService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, scheduleTime, execType, failureStrategy,
+ startNodeList, taskDependType, warningType,
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,
timeout, startParamMap, expectedParallelismNumber, dryRun);
+
+ if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
+
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
+ }
+ }
+
+ if (!startFailedProcessDefinitionCodeList.isEmpty()) {
+ putMsg(result, Status.BATCH_START_PROCESS_INSTANCE_ERROR,
String.join(Constants.COMMA, startFailedProcessDefinitionCodeList));
+ }
+
+ return returnDataList(result);
+ }
+
+ /**
* do action to process instance:pause, stop, repeat, recover from pause,
recover from stop
*
* @param loginUser login user
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index ec4931b..b5202c5 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -251,6 +251,7 @@ public enum Status {
COUNT_PROCESS_INSTANCE_STATE_ERROR(50012, "count process instance state
error", "查询各状态流程实例数错误"),
COUNT_PROCESS_DEFINITION_USER_ERROR(50013, "count process definition user
error", "查询各用户流程定义数错误"),
START_PROCESS_INSTANCE_ERROR(50014, "start process instance error",
"运行工作流实例错误"),
+ BATCH_START_PROCESS_INSTANCE_ERROR(50014, "batch start process instance
error: {0}", "批量运行工作流实例错误: {0}"),
EXECUTE_PROCESS_INSTANCE_ERROR(50015, "execute process instance error",
"操作工作流实例错误"),
CHECK_PROCESS_DEFINITION_ERROR(50016, "check process definition error",
"工作流定义错误"),
QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017, "query
recipients and copyers by process definition error", "查询收件人和抄送人错误"),
diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties
b/dolphinscheduler-api/src/main/resources/i18n/messages.properties
index 7a4282a..928e608 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties
@@ -18,13 +18,14 @@
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
-RUN_PROCESS_INSTANCE_NOTES=run process instance
+RUN_PROCESS_INSTANCE_NOTES=run process instance
+BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
RUN_MODE=run mode
TIMEOUT=timeout
-EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
+EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_TYPE=execute type
START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
GET_RECEIVER_CC_NOTES=query receiver cc
diff --git
a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
index 56800f2..a6ef33a 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
@@ -18,6 +18,8 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
+BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance(If any
processDefinitionCode cannot be found, the failure\
+ \ information is returned and the status is set to failed. The successful
task will run normally and will not stop)
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
diff --git
a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
index 6343d89..e5e7476 100644
--- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
+++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
@@ -19,6 +19,7 @@ PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作
UI_PLUGINS_TAG=UI插件相关操作
WORK_FLOW_LINEAGE_TAG=工作流血缘相关操作
RUN_PROCESS_INSTANCE_NOTES=运行流程实例
+BATCH_RUN_PROCESS_INSTANCE_NOTES=批量运行流程实例(其中有任意一个processDefinitionCode找不到,则返回失败信息并且状态置为失败,成功的任务会正常运行,不会停止)
START_NODE_LIST=开始节点列表(节点name)
TASK_DEPEND_TYPE=任务依赖类型
COMMAND_TYPE=指令类型