This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new 069e9f9 [Feature][JsonSplit] modify ProcessService
createTaskAndRelation (#4770)
069e9f9 is described below
commit 069e9f980ead8c3f35259a1df5e39fcd45ebb0b4
Author: JinyLeeChina <[email protected]>
AuthorDate: Fri Feb 12 08:43:44 2021 +0800
[Feature][JsonSplit] modify ProcessService createTaskAndRelation (#4770)
* add task query
* modify codestyle
* add task delete/update/swich method
* add task delete/update/swich method
* codestyle
* use updateById save task definition
* modify method name
* code style
* code style
* modify ProcessService createTaskAndRelation
Co-authored-by: JinyLeeChina <[email protected]>
---
.../service/process/ProcessService.java | 78 +++++++++++++---------
.../service/process/ProcessServiceTest.java | 2 +-
2 files changed, 48 insertions(+), 32 deletions(-)
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b6a6026..e47a5ac 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -97,8 +96,8 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -2278,11 +2277,12 @@ public class ProcessService {
/**
* create task definition and task relations
*/
- public int createTaskAndRelation(User operator,
- Long projectCode,
- ProcessDefinition processDefinition,
- ProcessData processData) {
+ public void createTaskAndRelation(User operator,
+ Long projectCode,
+ ProcessDefinition processDefinition,
+ ProcessData processData) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new
ArrayList<>() : processData.getTasks();
+ Map<String, Long> taskNameAndCode = new HashMap<>();
for (TaskNode taskNode : taskNodeList) {
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName());
if (taskDefinition == null) {
@@ -2292,44 +2292,60 @@ public class ProcessService {
taskDefinition = new TaskDefinition();
taskDefinition.setCode(code);
} catch (SnowFlakeException e) {
- logger.error("Task code get error, ", e);
- return -1;
+ throw new ServiceException("Task code get error", e);
}
saveTaskDefinition(operator, projectCode, taskNode,
taskDefinition);
} else {
if (isTaskOnline(taskDefinition.getCode())) {
- // TODO return something for fail
- return -1;
+ throw new ServiceException(String.format("The task %s is
on line in process", taskNode.getName()));
}
updateTaskDefinition(operator, projectCode, taskNode,
taskDefinition);
}
+ taskNameAndCode.put(taskNode.getName(), taskDefinition.getCode());
}
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode,
processDefinition.getCode());
}
- // TODO parse taskNodeList for preTaskCode and postTaskCode
- List<TaskNodeRelation> taskNodeRelationList =
DagHelper.getProcessDag(taskNodeList).getEdges();
+ List<ProcessTaskRelation> builderRelationList = new ArrayList<>();
Date now = new Date();
- ProcessTaskRelation processTaskRelation = new
ProcessTaskRelation("",// todo relation name
- processDefinition.getVersion(),
- projectCode,
- processDefinition.getCode(),
- 0L, // todo pre task code
- 0L, // todo post task code
- ConditionType.of(""), // todo conditionType
- "", // todo conditionParams
- now,
- now);
- // save process task relation
- int insert = processTaskRelationMapper.insert(processTaskRelation);
- // save process task relation log
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
- processTaskRelationLog.set(processTaskRelation);
- processTaskRelationLog.setOperator(operator.getId());
- processTaskRelationLog.setOperateTime(now);
- int logInsert =
processTaskRelationLogMapper.insert(processTaskRelationLog);
- return insert & logInsert;
+ for (TaskNode taskNode : taskNodeList) {
+ List<String> depList = taskNode.getDepList();
+ if (CollectionUtils.isNotEmpty(depList)) {
+ for (String preTaskName : depList) {
+ builderRelationList.add(new ProcessTaskRelation("",// todo
relation name
+ processDefinition.getVersion(),
+ projectCode,
+ processDefinition.getCode(),
+ taskNameAndCode.get(preTaskName),
+ taskNameAndCode.get(taskNode.getName()),
+ ConditionType.of("none"), // todo conditionType
+ taskNode.getConditionResult(),
+ now,
+ now));
+ }
+ } else {
+ builderRelationList.add(new ProcessTaskRelation("",// todo
relation name
+ processDefinition.getVersion(),
+ projectCode,
+ processDefinition.getCode(),
+ 0L,
+ taskNameAndCode.get(taskNode.getName()),
+ ConditionType.of("none"), // todo conditionType
+ taskNode.getConditionResult(),
+ now,
+ now));
+ }
+ }
+ for (ProcessTaskRelation processTaskRelation : builderRelationList) {
+ processTaskRelationMapper.insert(processTaskRelation);
+ // save process task relation log
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
+ processTaskRelationLog.set(processTaskRelation);
+ processTaskRelationLog.setOperator(operator.getId());
+ processTaskRelationLog.setOperateTime(now);
+ processTaskRelationLogMapper.insert(processTaskRelationLog);
+ }
}
public int saveTaskDefinition(User operator, Long projectCode, TaskNode
taskNode, TaskDefinition taskDefinition) {
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 551c9bb..b6d518c 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -439,7 +439,7 @@ public class ProcessServiceTest {
String expect = JSONUtils.toJsonString(exceptProcessData);
String oldJson = JSONUtils.toJsonString(oldProcessData);
- Assert.assertEquals(expect,
processService.changeJson(newProcessData,oldJson));
+ Assert.assertEquals(expect, processService.changeJson(newProcessData,
oldJson));
}
}