DarkAssassinator commented on code in PR #12051:
URL: https://github.com/apache/dolphinscheduler/pull/12051#discussion_r978231655
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -306,11 +311,64 @@ public Map<String, Object>
checkProcessDefinitionValid(long projectCode, Process
logger.warn("Subprocess definition of process definition is not
{}, processDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(),
processDefineCode);
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
} else {
- result.put(Constants.STATUS, Status.SUCCESS);
+ List<String> workerGroupNames =
workerGroupService.getAllWorkerGroupNames();
+ return checkWorkerGroupNameExists(processDefinition,
workerGroupNames);
}
return result;
}
+ /**
+ * check whether worker group is available
+ *
+ * @param processDefinition process definition
+ * @param workerGroupNames worker group name list
+ * @return check result
+ */
+ public Map<String, Object> checkWorkerGroupNameExists(ProcessDefinition
processDefinition,
+ List<String> workerGroupNames) {
+ Map<String, Object> result = new HashMap<>();
+ // get all task definitions in this process definition
+ List<ProcessTaskRelation> processTaskRelations = processService
+ .findRelationByCode(processDefinition.getCode(),
processDefinition.getVersion());
+ List<TaskDefinitionLog> taskDefinitionLogList = processService
+ .genTaskDefineList(processTaskRelations);
+ List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream()
+ .map(t -> (TaskDefinition) t).collect(
+ Collectors.toList());
+
+ for (TaskDefinition taskDefinition : taskDefinitions) {
+ if (!workerGroupNames.contains(taskDefinition.getWorkerGroup())) {
+ logger.error("Cannot find worker group {} configured on task
definition named {} ",
+ taskDefinition.getWorkerGroup(), taskDefinition.getName());
+ putMsg(result, Status.WORKER_GROUP_NOT_EXISTS,
taskDefinition.getName(),
+ taskDefinition.getWorkerGroup());
+ return result;
+ }
+
+ if (TaskConstants.TASK_TYPE_SUB_PROCESS
+ .equalsIgnoreCase(taskDefinition.getTaskType())) {
+ long subProcessCode = Long
+
.parseLong(JSONUtils.getNodeString(taskDefinition.getTaskParams(),
+ Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE));
+
+ ProcessDefinition subProcessDefinition =
processDefinitionMapper
+ .queryByCode(subProcessCode);
+ if (subProcessDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(subProcessCode));
+ return result;
+ }
+ // check all sub process recursively
+ Map<String, Object> subResult =
checkWorkerGroupNameExists(subProcessDefinition,
Review Comment:
> I am not sure if this will exist a cycle here.
because workflow is a DAG, so this change like a DFS, as i think it will not
exist a cycle
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]