SbloodyS commented on a change in pull request #7771:
URL: https://github.com/apache/dolphinscheduler/pull/7771#discussion_r777823331
##########
File path:
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
##########
@@ -138,6 +139,100 @@ public Result startProcessInstance(@ApiIgnore
@RequestAttribute(value = Constant
return returnDataList(result);
}
+ /**
+ * batch execute process instance
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param processDefinitionCodes process definition codes
+ * @param scheduleTime schedule time
+ * @param failureStrategy failure strategy
+ * @param startNodeList start nodes list
+ * @param taskDependType task depend type
+ * @param execType execute type
+ * @param warningType warning type
+ * @param warningGroupId warning group id
+ * @param runMode run mode
+ * @param processInstancePriority process instance priority
+ * @param workerGroup worker group
+ * @param timeout timeout
+ * @param expectedParallelismNumber the expected parallelism number when
execute complement in parallel mode
+ * @return start process result code
+ */
+ @ApiOperation(value = "batchStartProcessInstance", notes =
"BATCH_RUN_PROCESS_INSTANCE_NOTES")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "processDefinitionCodes", value =
"PROCESS_DEFINITION_CODES", required = true, dataType = "String", example =
"1,2,3"),
+ @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME",
required = true, dataType = "String"),
+ @ApiImplicitParam(name = "failureStrategy", value =
"FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+ @ApiImplicitParam(name = "startNodeList", value =
"START_NODE_LIST", dataType = "String"),
+ @ApiImplicitParam(name = "taskDependType", value =
"TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+ @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE",
dataType = "CommandType"),
+ @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",
required = true, dataType = "WarningType"),
+ @ApiImplicitParam(name = "warningGroupId", value =
"WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+ @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType =
"RunMode"),
+ @ApiImplicitParam(name = "processInstancePriority", value =
"PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+ @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP",
dataType = "String", example = "default"),
+ @ApiImplicitParam(name = "environmentCode", value =
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
+ @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType =
"Int", example = "100"),
+ @ApiImplicitParam(name = "expectedParallelismNumber", value =
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+ })
+ @PostMapping(value = "batch-start-process-instance")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(START_PROCESS_INSTANCE_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value
= Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @RequestParam(value =
"processDefinitionCodes") String processDefinitionCodes,
+ @RequestParam(value = "scheduleTime",
required = false) String scheduleTime,
+ @RequestParam(value =
"failureStrategy", required = true) FailureStrategy failureStrategy,
+ @RequestParam(value = "startNodeList",
required = false) String startNodeList,
+ @RequestParam(value = "taskDependType",
required = false) TaskDependType taskDependType,
+ @RequestParam(value = "execType",
required = false) CommandType execType,
+ @RequestParam(value = "warningType",
required = true) WarningType warningType,
+ @RequestParam(value = "warningGroupId",
required = false) int warningGroupId,
+ @RequestParam(value = "runMode",
required = false) RunMode runMode,
+ @RequestParam(value =
"processInstancePriority", required = false) Priority processInstancePriority,
+ @RequestParam(value = "workerGroup",
required = false, defaultValue = "default") String workerGroup,
+ @RequestParam(value =
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
+ @RequestParam(value = "timeout",
required = false) Integer timeout,
+ @RequestParam(value = "startParams",
required = false) String startParams,
+ @RequestParam(value =
"expectedParallelismNumber", required = false) Integer
expectedParallelismNumber,
+ @RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun) {
+
+ if (timeout == null) {
+ timeout = Constants.MAX_TASK_TIMEOUT;
+ }
+ Map<String, String> startParamMap = null;
+ if (startParams != null) {
+ startParamMap = JSONUtils.toMap(startParams);
+ }
+
+ Map<String, Object> result = new HashMap<>();
+ String[] processDefinitionCodeArray =
processDefinitionCodes.split(",");
+ Set<Long> processDefinitionCodeSet = new HashSet<>();
+ List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
+
+ for (String StrProcessDefinitionCode :processDefinitionCodeArray) {
+ long processDefinitionCode =
Long.parseLong(StrProcessDefinitionCode);
+ processDefinitionCodeSet.add(processDefinitionCode);
+ }
+
+ for (long processDefinitionCode : processDefinitionCodeSet) {
+ result = execService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, scheduleTime, execType, failureStrategy,
+ startNodeList, taskDependType, warningType,
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,
timeout, startParamMap, expectedParallelismNumber, dryRun);
+
+ if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
+
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
+ }
Review comment:
I think we should not terminating all other start if any of
```processDefinitionCodeSet``` failed in case of process definition offline or
invalid process definition code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]