This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ae847ba331 [Fix-15036] [API] Fix task definition edit doesn't work
(#14801)
ae847ba331 is described below
commit ae847ba331ac6edec3c6b2e7013098bb274a5871
Author: 小可耐 <[email protected]>
AuthorDate: Wed Oct 25 20:25:02 2023 +0800
[Fix-15036] [API] Fix task definition edit doesn't work (#14801)
* [Bug] [task definition] 修复任务定义修改,工作流无法生效问题
* [Bug] [API] Fix task check apply
* [BUG][API] fix updateTaskDefinition method ut
* [BUG][API] fix updateTaskDefinition method code specification
* [BUG][API] fix one task corresponds to multiple workflow problems
* [BUG][API] queryProcessTaskRelationByTaskCodeAndTaskVersion method adjust
* [BUG][API] fix updateTaskDefinition method update
---------
Co-authored-by: David Zollo <[email protected]>
Co-authored-by: 旺阳 <[email protected]>
Co-authored-by: xiangzihao <[email protected]>
---
.../service/impl/TaskDefinitionServiceImpl.java | 68 ++++++++++++++++++----
.../api/service/TaskDefinitionServiceImplTest.java | 10 +++-
.../dao/mapper/ProcessTaskRelationMapper.java | 10 ++++
.../dao/mapper/ProcessTaskRelationMapper.xml | 18 +++++-
4 files changed, 92 insertions(+), 14 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 838744767a..303e3e62b0 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -51,6 +51,7 @@ import
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -58,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -141,6 +143,9 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
@Autowired
private ProcessDefinitionService processDefinitionService;
+ @Autowired
+ private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
/**
* create task definition
*
@@ -781,20 +786,59 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
projectCode, taskCode,
taskDefinitionToUpdate.getVersion());
// update process task relation
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper
- .queryByTaskCode(taskDefinitionToUpdate.getCode());
+
.queryProcessTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(),
+ taskDefinition.getVersion());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
- for (ProcessTaskRelation processTaskRelation :
processTaskRelations) {
- if (taskCode == processTaskRelation.getPreTaskCode()) {
- processTaskRelation.setPreTaskVersion(version);
- } else if (taskCode == processTaskRelation.getPostTaskCode()) {
- processTaskRelation.setPostTaskVersion(version);
+ Map<Long, List<ProcessTaskRelation>> processTaskRelationGroupList
= processTaskRelations.stream()
+
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
+ for (Map.Entry<Long, List<ProcessTaskRelation>>
processTaskRelationMap : processTaskRelationGroupList
+ .entrySet()) {
+ Long processDefinitionCode = processTaskRelationMap.getKey();
+ int processDefinitionVersion =
+
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionCode)
+ + 1;
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMap.getValue();
+ for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
+ if (taskCode == processTaskRelation.getPreTaskCode()) {
+ processTaskRelation.setPreTaskVersion(version);
+ } else if (taskCode ==
processTaskRelation.getPostTaskCode()) {
+ processTaskRelation.setPostTaskVersion(version);
+ }
+
processTaskRelation.setProcessDefinitionVersion(processDefinitionVersion);
+ int updateProcessDefinitionVersionCount =
+
processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
+ if (updateProcessDefinitionVersionCount != 1) {
+ log.error("batch update process task relation error,
projectCode:{}, taskDefinitionCode:{}.",
+ projectCode, taskCode);
+ putMsg(result,
Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+ throw new
ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+ }
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
+ processTaskRelationLog.setOperator(loginUser.getId());
+ processTaskRelationLog.setId(null);
+ processTaskRelationLog.setOperateTime(now);
+ int insertProcessTaskRelationLogCount =
processTaskRelationLogDao.insert(processTaskRelationLog);
+ if (insertProcessTaskRelationLogCount != 1) {
+ log.error("batch update process task relation error,
projectCode:{}, taskDefinitionCode:{}.",
+ projectCode, taskCode);
+ putMsg(result,
Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
+ throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
+ }
}
- int count =
processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
- if (count != 1) {
- log.error("batch update process task relation error,
projectCode:{}, taskDefinitionCode:{}.",
- projectCode, taskCode);
- putMsg(result,
Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
- throw new
ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
+ ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefinitionCode);
+ processDefinition.setVersion(processDefinitionVersion);
+ processDefinition.setUpdateTime(now);
+ processDefinition.setUserId(loginUser.getId());
+ // update process definition
+ int updateProcessDefinitionCount =
processDefinitionMapper.updateById(processDefinition);
+ ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
+ processDefinitionLog.setOperateTime(now);
+ processDefinitionLog.setId(null);
+ processDefinitionLog.setOperator(loginUser.getId());
+ int insertProcessDefinitionLogCount =
processDefinitionLogMapper.insert(processDefinitionLog);
+ if ((updateProcessDefinitionCount &
insertProcessDefinitionLogCount) != 1) {
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ throw new
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
}
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 26091e7529..2644712074 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -129,6 +129,9 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessTaskRelationLogDao processTaskRelationLogDao;
+ @Mock
+ private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
private static final String TASK_PARAMETER =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
private static final long PROJECT_CODE = 1L;
@@ -188,9 +191,14 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new
TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
+
Mockito.when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1);
+ Mockito.when(processDefinitionMapper.queryByCode(2L)).thenReturn(new
ProcessDefinition());
+
Mockito.when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1);
+
Mockito.when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
-
Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
+
Mockito.when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE,
0))
+ .thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user,
PROJECT_CODE, TASK_CODE, taskDefinitionJson);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 9b8dfc8736..07ff5c2d86 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -225,4 +225,14 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
void deleteByWorkflowDefinitionCode(@Param("workflowDefinitionCode") long
workflowDefinitionCode,
@Param("workflowDefinitionVersion")
int workflowDefinitionVersion);
+
+ /**
+ * process task relation by taskCode and postTaskVersion
+ *
+ * @param taskCode taskCode
+ * @param postTaskVersion postTaskVersion
+ * @return ProcessTaskRelation
+ */
+ List<ProcessTaskRelation>
queryProcessTaskRelationByTaskCodeAndTaskVersion(@Param("taskCode") long
taskCode,
+
@Param("postTaskVersion") long postTaskVersion);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 2da8514c55..742d53726e 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -231,7 +231,8 @@
<update id="updateProcessTaskRelationTaskVersion">
update t_ds_process_task_relation
set pre_task_version=#{processTaskRelation.preTaskVersion},
- post_task_version=#{processTaskRelation.postTaskVersion}
+ post_task_version=#{processTaskRelation.postTaskVersion},
+
process_definition_version=#{processTaskRelation.processDefinitionVersion}
where id = #{processTaskRelation.id}
</update>
@@ -240,4 +241,19 @@
from t_ds_process_task_relation
where process_definition_code = #{workflowDefinitionCode} and
process_definition_version = #{workflowDefinitionVersion}
</delete>
+
+ <select id="queryProcessTaskRelationByTaskCodeAndTaskVersion"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation
+ WHERE process_definition_code in (
+ SELECT
+ process_definition_code
+ FROM
+ t_ds_process_task_relation
+ WHERE
+ post_task_code = #{taskCode}
+ and post_task_version =
#{postTaskVersion}
+ )
+ </select>
</mapper>