github-code-scanning[bot] commented on code in PR #13023:
URL:
https://github.com/apache/dolphinscheduler/pull/13023#discussion_r1033314140
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java:
##########
@@ -812,78 +811,89 @@
if (result.get(Constants.STATUS) != Status.SUCCESS &&
taskDefinitionToUpdate == null) {
return result;
}
+ // get sourceUpstreamTaskCodeSet
List<ProcessTaskRelation> upstreamTaskRelations =
processTaskRelationMapper.queryUpstreamByCode(projectCode,
taskCode);
- Set<Long> upstreamCodeSet =
+ Set<Long> sourceUpstreamCodeSet =
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
- Set<Long> upstreamTaskCodes = Collections.emptySet();
- if (StringUtils.isNotEmpty(upstreamCodes)) {
- upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
- .collect(Collectors.toSet());
- }
- if (CollectionUtils.isEqualCollection(upstreamCodeSet,
upstreamTaskCodes) && taskDefinitionToUpdate == null) {
+ // get updateUpstreamTaskCodeSet
+ Set<Long> updateUpstreamTaskCodeSet = getTaskCodeSet(upstreamCodes,
result);
+
+ if (CollectionUtils.isEqualCollection(sourceUpstreamCodeSet,
updateUpstreamTaskCodeSet)
+ && taskDefinitionToUpdate == null) {
putMsg(result, Status.SUCCESS);
return result;
- } else {
- if (taskDefinitionToUpdate == null) {
- taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
- }
+ } else if (taskDefinitionToUpdate == null) {
+ taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
}
- Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
- if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
- List<TaskDefinition> upstreamTaskDefinitionList =
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
- queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream()
- .collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinition -> taskDefinition));
- // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
- upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
- if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
- String notExistTaskCodes = StringUtils.join(upstreamTaskCodes,
Constants.COMMA);
- logger.error("Some task definitions in parameter
upstreamTaskCodes do not exist, notExistTaskCodes:{}.",
- notExistTaskCodes);
- putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
notExistTaskCodes);
- return result;
- }
- } else {
- queryUpStreamTaskCodeMap = new HashMap<>();
+ // get survive updateUpstreamTask
+ getUpdateUpstreamTaskCodeMap(updateUpstreamTaskCodeSet, result);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
}
- if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
+ // update log
+ if (CollectionUtils.isNotEmpty(updateUpstreamTaskCodeSet)) {
ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
taskRelation.getProcessDefinitionCode());
- List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
- List<ProcessTaskRelation> relationList = Lists.newArrayList();
- for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
- if (processTaskRelation.getPostTaskCode() == taskCode) {
- if
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode())
- && processTaskRelation.getPreTaskCode() != 0L) {
-
queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode());
- } else {
- processTaskRelation.setPreTaskCode(0L);
- processTaskRelation.setPreTaskVersion(0);
- relationList.add(processTaskRelation);
- }
- }
- }
- processTaskRelationList.removeAll(relationList);
- for (Map.Entry<Long, TaskDefinition> queryUpStreamTask :
queryUpStreamTaskCodeMap.entrySet()) {
- taskRelation.setPreTaskCode(queryUpStreamTask.getKey());
-
taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion());
- processTaskRelationList.add(taskRelation);
- }
- if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) &&
CollectionUtils.isNotEmpty(processTaskRelationList)) {
- processTaskRelationList.add(processTaskRelationList.get(0));
- }
updateDag(loginUser, taskRelation.getProcessDefinitionCode(),
processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
logger.info(
"Update task with upstream tasks complete, projectCode:{},
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
- projectCode, taskCode, upstreamTaskCodes);
+ projectCode, taskCode, updateUpstreamTaskCodeSet);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}
+ /**
+ * analysis upstreamCodes by Constants.COMMA
+ * @param upstreamCodes need analysis string
+ * @param result result
+ * @return codes set
+ */
+ private Set<Long> getTaskCodeSet(String upstreamCodes, Map<String, Object>
result) {
+ if (StringUtils.isNotEmpty(upstreamCodes)) {
+ try {
+ return
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
+ .collect(Collectors.toSet());
+ } catch (NumberFormatException e) {
+ logger.error("upstreamCodes numberFormatException : {}",
upstreamCodes);
Review Comment:
## Log Injection
This log entry depends on a [user-provided value](1).
[Show more
details](https://github.com/apache/dolphinscheduler/security/code-scanning/2358)
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java:
##########
@@ -812,78 +811,89 @@
if (result.get(Constants.STATUS) != Status.SUCCESS &&
taskDefinitionToUpdate == null) {
return result;
}
+ // get sourceUpstreamTaskCodeSet
List<ProcessTaskRelation> upstreamTaskRelations =
processTaskRelationMapper.queryUpstreamByCode(projectCode,
taskCode);
- Set<Long> upstreamCodeSet =
+ Set<Long> sourceUpstreamCodeSet =
upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
- Set<Long> upstreamTaskCodes = Collections.emptySet();
- if (StringUtils.isNotEmpty(upstreamCodes)) {
- upstreamTaskCodes =
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
- .collect(Collectors.toSet());
- }
- if (CollectionUtils.isEqualCollection(upstreamCodeSet,
upstreamTaskCodes) && taskDefinitionToUpdate == null) {
+ // get updateUpstreamTaskCodeSet
+ Set<Long> updateUpstreamTaskCodeSet = getTaskCodeSet(upstreamCodes,
result);
+
+ if (CollectionUtils.isEqualCollection(sourceUpstreamCodeSet,
updateUpstreamTaskCodeSet)
+ && taskDefinitionToUpdate == null) {
putMsg(result, Status.SUCCESS);
return result;
- } else {
- if (taskDefinitionToUpdate == null) {
- taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
- }
+ } else if (taskDefinitionToUpdate == null) {
+ taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
}
- Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
- if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
- List<TaskDefinition> upstreamTaskDefinitionList =
taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
- queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream()
- .collect(Collectors.toMap(TaskDefinition::getCode,
taskDefinition -> taskDefinition));
- // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
- upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
- if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
- String notExistTaskCodes = StringUtils.join(upstreamTaskCodes,
Constants.COMMA);
- logger.error("Some task definitions in parameter
upstreamTaskCodes do not exist, notExistTaskCodes:{}.",
- notExistTaskCodes);
- putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
notExistTaskCodes);
- return result;
- }
- } else {
- queryUpStreamTaskCodeMap = new HashMap<>();
+ // get survive updateUpstreamTask
+ getUpdateUpstreamTaskCodeMap(updateUpstreamTaskCodeSet, result);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
}
- if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
+ // update log
+ if (CollectionUtils.isNotEmpty(updateUpstreamTaskCodeSet)) {
ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode,
taskRelation.getProcessDefinitionCode());
- List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
- List<ProcessTaskRelation> relationList = Lists.newArrayList();
- for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
- if (processTaskRelation.getPostTaskCode() == taskCode) {
- if
(queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode())
- && processTaskRelation.getPreTaskCode() != 0L) {
-
queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode());
- } else {
- processTaskRelation.setPreTaskCode(0L);
- processTaskRelation.setPreTaskVersion(0);
- relationList.add(processTaskRelation);
- }
- }
- }
- processTaskRelationList.removeAll(relationList);
- for (Map.Entry<Long, TaskDefinition> queryUpStreamTask :
queryUpStreamTaskCodeMap.entrySet()) {
- taskRelation.setPreTaskCode(queryUpStreamTask.getKey());
-
taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion());
- processTaskRelationList.add(taskRelation);
- }
- if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) &&
CollectionUtils.isNotEmpty(processTaskRelationList)) {
- processTaskRelationList.add(processTaskRelationList.get(0));
- }
updateDag(loginUser, taskRelation.getProcessDefinitionCode(),
processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
logger.info(
"Update task with upstream tasks complete, projectCode:{},
taskDefinitionCode:{}, upstreamTaskCodes:{}.",
- projectCode, taskCode, upstreamTaskCodes);
+ projectCode, taskCode, updateUpstreamTaskCodeSet);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}
+ /**
+ * analysis upstreamCodes by Constants.COMMA
+ * @param upstreamCodes need analysis string
+ * @param result result
+ * @return codes set
+ */
+ private Set<Long> getTaskCodeSet(String upstreamCodes, Map<String, Object>
result) {
+ if (StringUtils.isNotEmpty(upstreamCodes)) {
+ try {
+ return
Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong)
Review Comment:
## Missing catch of NumberFormatException
Potential uncaught 'java.lang.NumberFormatException'.
[Show more
details](https://github.com/apache/dolphinscheduler/security/code-scanning/2357)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]