This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new db96bf2 [Feature][JsonSplit] Fix batchMove of processDefinition bug
(#5371)
db96bf2 is described below
commit db96bf2dfece57e75db6238eda13206fe335e7de
Author: JinyLeeChina <[email protected]>
AuthorDate: Sat Apr 24 15:33:23 2021 +0800
[Feature][JsonSplit] Fix batchMove of processDefinition bug (#5371)
* update SnowFlake
* update processDefinite from processInstance
* update processDefinite from processInstance
* Fix task logger path
* Fix dependTask bug
* Fix batchMove of processDefinition bug
* codeStyle
Co-authored-by: JinyLeeChina <[email protected]>
---
.../service/impl/ProcessDefinitionServiceImpl.java | 116 ++++++++++-----------
.../service/process/ProcessService.java | 54 ++--------
2 files changed, 65 insertions(+), 105 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 75fe58a..41b97d8 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -155,6 +156,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
+ @Autowired
private SchedulerService schedulerService;
/**
@@ -563,7 +565,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
// To check resources whether they are already cancel
authorized or deleted
String resourceIds = processDefinition.getResourceIds();
if (StringUtils.isNotBlank(resourceIds)) {
- Integer[] resourceIdArray =
Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
+ Integer[] resourceIdArray =
Arrays.stream(resourceIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
PermissionCheck<Integer> permissionCheck = new
PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService,
resourceIdArray, loginUser.getId(), logger);
try {
permissionCheck.checkPermission();
@@ -1463,8 +1465,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return checkResult;
}
- // TODO:
- // Project targetProject =
projectMapper.queryDetailByCode(targetProjectCode);
Project targetProject = projectMapper.queryDetailById(targetProjectId);
if (targetProject == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);
@@ -1501,7 +1501,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
int targetProjectId)
{
Map<String, Object> result = new HashMap<>();
List<String> failedProcessList = new ArrayList<>();
-
//check src project auth
Map<String, Object> checkResult = checkProjectAndAuth(loginUser,
projectName);
if (checkResult != null) {
@@ -1513,8 +1512,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- // TODO :
- // Project targetProject =
projectMapper.queryDetailByCode(targetProjectCode);
Project targetProject = projectMapper.queryDetailById(targetProjectId);
if (targetProject == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);
@@ -1528,14 +1525,63 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
}
- String[] processDefinitionIdList =
processDefinitionIds.split(Constants.COMMA);
- doBatchMoveProcessDefinition(targetProject, failedProcessList,
processDefinitionIdList);
+ Integer[] definitionIds =
Arrays.stream(processDefinitionIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
+ List<ProcessDefinition> processDefinitionList =
processDefinitionMapper.queryDefinitionListByIdList(definitionIds);
+ for (ProcessDefinition processDefinition : processDefinitionList) {
+ ProcessDefinitionLog processDefinitionLog =
moveProcessDefinition(loginUser, targetProject.getCode(), processDefinition,
result, failedProcessList);
+ if (processDefinitionLog != null) {
+ moveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinitionLog);
+ }
+ }
checkBatchOperateResult(projectName, targetProject.getName(), result,
failedProcessList, false);
-
return result;
}
+ private ProcessDefinitionLog moveProcessDefinition(User loginUser, Long
targetProjectCode, ProcessDefinition processDefinition,
+ Map<String, Object>
result, List<String> failedProcessList) {
+ try {
+ Integer version =
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
+ ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
+ processDefinitionLog.setVersion(version == null || version == 0 ?
1 : version + 1);
+ processDefinitionLog.setProjectCode(targetProjectCode);
+ processDefinitionLog.setOperator(loginUser.getId());
+ Date now = new Date();
+ processDefinitionLog.setOperateTime(now);
+ processDefinitionLog.setUpdateTime(now);
+ processDefinitionLog.setCreateTime(now);
+ int update =
processDefinitionMapper.updateById(processDefinitionLog);
+ int insertLog =
processDefinitionLogMapper.insert(processDefinitionLog);
+ if ((insertLog & update) > 0) {
+ putMsg(result, Status.SUCCESS);
+ } else {
+ failedProcessList.add(processDefinition.getId() + "[" +
processDefinition.getName() + "]");
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ }
+ return processDefinitionLog;
+ } catch (Exception e) {
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ failedProcessList.add(processDefinition.getId() + "[" +
processDefinition.getName() + "]");
+ logger.error("move processDefinition error: {}", e.getMessage(),
e);
+ }
+ return null;
+ }
+
+ private void moveTaskRelation(User loginUser, Long projectCode,
ProcessDefinitionLog processDefinition) {
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinition.getCode());
+ if (!processTaskRelationList.isEmpty()) {
+ processTaskRelationMapper.deleteByCode(projectCode,
processDefinition.getCode());
+ }
+ Date now = new Date();
+ for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
+
processTaskRelation.setProjectCode(processDefinition.getProjectCode());
+
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
+ processTaskRelation.setCreateTime(now);
+ processTaskRelation.setUpdateTime(now);
+ processService.saveTaskRelation(loginUser, processTaskRelation);
+ }
+ }
+
/**
* switch the defined process definition verison
*
@@ -1586,30 +1632,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
/**
- * do batch move process definition
- *
- * @param targetProject targetProject
- * @param failedProcessList failedProcessList
- * @param processDefinitionIdList processDefinitionIdList
- */
- private void doBatchMoveProcessDefinition(Project targetProject,
List<String> failedProcessList, String[] processDefinitionIdList) {
- for (String processDefinitionId : processDefinitionIdList) {
- try {
- Map<String, Object> moveProcessDefinitionResult =
-
moveProcessDefinition(Integer.valueOf(processDefinitionId), targetProject);
- if
(!Status.SUCCESS.equals(moveProcessDefinitionResult.get(Constants.STATUS))) {
- setFailedProcessList(failedProcessList,
processDefinitionId);
- logger.error((String)
moveProcessDefinitionResult.get(Constants.MSG));
- }
- } catch (Exception e) {
- setFailedProcessList(failedProcessList, processDefinitionId);
- logger.error("move processDefinition error: {}",
e.getMessage(), e);
-
- }
- }
- }
-
- /**
* batch copy process definition
*
* @param loginUser loginUser
@@ -1669,34 +1691,6 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
/**
- * move process definition
- *
- * @param processId processId
- * @param targetProject targetProject
- * @return move result code
- */
- private Map<String, Object> moveProcessDefinition(Integer processId,
- Project targetProject) {
-
- Map<String, Object> result = new HashMap<>();
-
- ProcessDefinition processDefinition =
processDefinitionMapper.selectById(processId);
- if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
- return result;
- }
-
- processDefinition.setProjectId(targetProject.getId());
- processDefinition.setUpdateTime(new Date());
- if (processDefinitionMapper.updateById(processDefinition) > 0) {
- putMsg(result, Status.SUCCESS);
- } else {
- putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
- return result;
- }
-
- /**
* check batch operate result
*
* @param srcProjectName srcProjectName
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 7433824..7868348 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2370,16 +2370,20 @@ public class ProcessService {
}
}
for (ProcessTaskRelation processTaskRelation : builderRelationList) {
- processTaskRelationMapper.insert(processTaskRelation);
- // save process task relation log
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
- processTaskRelationLog.setOperator(operator.getId());
- processTaskRelationLog.setOperateTime(now);
- processTaskRelationLogMapper.insert(processTaskRelationLog);
+ saveTaskRelation(operator, processTaskRelation);
}
return 0;
}
+ public void saveTaskRelation(User operator, ProcessTaskRelation
processTaskRelation) {
+ processTaskRelationMapper.insert(processTaskRelation);
+ // save process task relation log
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog(processTaskRelation);
+ processTaskRelationLog.setOperator(operator.getId());
+ processTaskRelationLog.setOperateTime(new Date());
+ processTaskRelationLogMapper.insert(processTaskRelationLog);
+ }
+
public int saveTaskDefinition(User operator, Long projectCode, TaskNode
taskNode, TaskDefinition taskDefinition) {
Date now = new Date();
taskDefinition.setProjectCode(projectCode);
@@ -2499,44 +2503,6 @@ public class ProcessService {
}
/**
- * getTaskNodeFromTaskInstance
- * return null if task definition do not exists
- *
- * @param taskInstance
- * @return
- */
- public TaskNode getTaskNodeFromTaskInstance(TaskInstance taskInstance) {
- TaskNode taskNode = new TaskNode();
- ProcessInstance processInstance =
processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
- TaskDefinition taskDefinition =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
- taskInstance.getTaskCode(),
- taskInstance.getTaskDefinitionVersion());
- if (taskDefinition == null) {
- return null;
- }
- List<ProcessTaskRelationLog> taskRelationList =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(
- processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()
- );
- Map<Long, Integer> taskCodeMap = new HashedMap();
-
- taskRelationList.forEach(relation ->
taskCodeMap.putIfAbsent(relation.getPostTaskCode(),
relation.getPostTaskVersion()));
-
- taskNode.setCode(taskDefinition.getCode());
- taskNode.setVersion(taskDefinition.getVersion());
- taskNode.setName(taskDefinition.getName());
- taskNode.setName(taskDefinition.getName());
- taskNode.setDesc(taskDefinition.getDescription());
- taskNode.setType(taskDefinition.getTaskType());
- taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ?
Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL);
- taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
- taskNode.setRetryInterval(taskDefinition.getFailRetryInterval());
- taskNode.setParams(taskDefinition.getTaskParams());
- taskNode.setTaskInstancePriority(taskDefinition.getTaskPriority());
- taskNode.setWorkerGroup(taskDefinition.getWorkerGroup());
- return taskNode;
- }
-
- /**
* find task definition by code and verision
*
* @param taskCode