zhongjiajie commented on code in PR #13094:
URL:
https://github.com/apache/dolphinscheduler/pull/13094#discussion_r1039385375
##########
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java:
##########
@@ -1068,11 +1068,23 @@ public void testUpdateProcessDefinitionV2() {
((ServiceException) exception).getCode());
// success
+
Mockito.when(processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()))
Review Comment:
we should add more test about the version, including
1. update workflow entity only
2. update task only
3. update task dependence only
all three situation should check workflow, workflow task relation correct or
not, and all task still in the workflow
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition
updateSingleProcessDefinition(User loginUser,
}
processDefinitionUpdate.setTenantId(tenant.getId());
}
- int update =
processDefinitionMapper.updateById(processDefinitionUpdate);
- if (update <= 0) {
+ int insertVersion = this.saveProcessDefine(loginUser,
processDefinitionUpdate);
+ if (insertVersion == 0) {
+ logger.error("Update process definition error, projectCode:{},
processDefinitionName:{}.",
+ processDefinitionUpdate.getCode(),
+ processDefinitionUpdate.getName());
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
- this.syncObj2Log(loginUser, processDefinition);
- return processDefinition;
+
+ int insertRelationVersion = this.saveTaskRelation(loginUser,
processDefinitionUpdate, insertVersion);
+ if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+ logger.error("Save process task relations error, projectCode:{},
processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion);
+ throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ } else {
+ logger.info("Save process task relations complete, projectCode:{},
processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion);
+ }
Review Comment:
good catch
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java:
##########
@@ -460,10 +474,19 @@ public List<ProcessTaskRelation>
updateUpstreamTaskDefinition(User loginUser,
// create relation not exists
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
for (long createCode : taskCodeCreates) {
- TaskDefinition upstreamTask =
taskDefinitionMapper.queryByCode(createCode);
+ long upstreamCode = 0L;
+ int version = 0;
+ if (createCode != 0L) {
Review Comment:
```suggestion
// 0 for DAG root, should not, it may already exists and skip to
create anymore
if (createCode != 0L) {
```
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition
updateSingleProcessDefinition(User loginUser,
}
processDefinitionUpdate.setTenantId(tenant.getId());
}
- int update =
processDefinitionMapper.updateById(processDefinitionUpdate);
- if (update <= 0) {
+ int insertVersion = this.saveProcessDefine(loginUser,
processDefinitionUpdate);
+ if (insertVersion == 0) {
+ logger.error("Update process definition error, projectCode:{},
processDefinitionName:{}.",
+ processDefinitionUpdate.getCode(),
+ processDefinitionUpdate.getName());
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
- this.syncObj2Log(loginUser, processDefinition);
- return processDefinition;
+
+ int insertRelationVersion = this.saveTaskRelation(loginUser,
processDefinitionUpdate, insertVersion);
+ if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+ logger.error("Save process task relations error, projectCode:{},
processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion);
+ throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ } else {
+ logger.info("Save process task relations complete, projectCode:{},
processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion);
+ }
+ processDefinitionUpdate.setVersion(insertVersion);
+ return processDefinitionUpdate;
+ }
+ public int saveProcessDefine(User loginUser, ProcessDefinition
processDefinition) {
+ ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
+ Integer version =
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+ int insertVersion = version == null || version == 0 ?
Constants.VERSION_FIRST : version + 1;
+ processDefinitionLog.setVersion(insertVersion);
+
+ processDefinitionLog.setOperator(loginUser.getId());
+ processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+ processDefinitionLog.setId(null);
+ int insertLog =
processDefinitionLogMapper.insert(processDefinitionLog);
+ processDefinitionLog.setId(processDefinition.getId());
+ int result = processDefinitionMapper.updateById(processDefinitionLog);
Review Comment:
Yeah, It seems you directly copy-paste the code from the process service to
here, but is it better to refactor them and make them more sense, when we want
to operator `processDefinitionMapper ` show use `ProcessDefinition` object
instead of `ProcessDefinitionLog`
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java:
##########
@@ -97,7 +97,7 @@ public class TaskCreateRequest {
private Integer memoryMax;
@Schema(example = "upstream-task-codes1,upstream-task-codes2", description
= "use , to split multiple upstream task codes")
- private String upstreamTasksCodes;
+ private String upstreamTasksCodes = "0";
Review Comment:
mean the root node of dag, maybe we should add constants for this
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2775,12 +2776,104 @@ public ProcessDefinition
updateSingleProcessDefinition(User loginUser,
}
processDefinitionUpdate.setTenantId(tenant.getId());
}
- int update =
processDefinitionMapper.updateById(processDefinitionUpdate);
- if (update <= 0) {
+ int insertVersion = this.saveProcessDefine(loginUser,
processDefinitionUpdate);
+ if (insertVersion == 0) {
+ logger.error("Update process definition error, projectCode:{},
processDefinitionName:{}.",
+ processDefinitionUpdate.getCode(),
+ processDefinitionUpdate.getName());
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
- this.syncObj2Log(loginUser, processDefinition);
- return processDefinition;
+
+ int insertRelationVersion = this.saveTaskRelation(loginUser,
processDefinitionUpdate, insertVersion);
+ if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
+ logger.error("Save process task relations error, projectCode:{},
processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion);
+ throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
+ } else {
+ logger.info("Save process task relations complete, projectCode:{},
processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion);
+ }
+ processDefinitionUpdate.setVersion(insertVersion);
+ return processDefinitionUpdate;
+ }
+ public int saveProcessDefine(User loginUser, ProcessDefinition
processDefinition) {
+ ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
+ Integer version =
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+ int insertVersion = version == null || version == 0 ?
Constants.VERSION_FIRST : version + 1;
+ processDefinitionLog.setVersion(insertVersion);
+
+ processDefinitionLog.setOperator(loginUser.getId());
+ processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
+ processDefinitionLog.setId(null);
+ int insertLog =
processDefinitionLogMapper.insert(processDefinitionLog);
+ processDefinitionLog.setId(processDefinition.getId());
+ int result = processDefinitionMapper.updateById(processDefinitionLog);
Review Comment:
And BTW, as the ProcessDefinitionLog is a snapshot of ProcessDefinition, I
think we should better update ProcessDefinition before we ProcessDefinitionLog
to make it more sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]