This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/json_split by this push:
     new 069e9f9  [Feature][JsonSplit] modify ProcessService 
createTaskAndRelation (#4770)
069e9f9 is described below

commit 069e9f980ead8c3f35259a1df5e39fcd45ebb0b4
Author: JinyLeeChina <[email protected]>
AuthorDate: Fri Feb 12 08:43:44 2021 +0800

    [Feature][JsonSplit] modify ProcessService createTaskAndRelation (#4770)
    
    * add task query
    
    * modify codestyle
    
    * add task delete/update/swich method
    
    * add task delete/update/swich method
    
    * codestyle
    
    * use updateById save task definition
    
    * modify method name
    
    * code style
    
    * code style
    
    * modify ProcessService createTaskAndRelation
    
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../service/process/ProcessService.java            | 78 +++++++++++++---------
 .../service/process/ProcessServiceTest.java        |  2 +-
 2 files changed, 48 insertions(+), 32 deletions(-)

diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b6a6026..e47a5ac 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.model.DateInterval;
 import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -97,8 +96,8 @@ import 
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 
@@ -2278,11 +2277,12 @@ public class ProcessService {
     /**
      * create task definition and task relations
      */
-    public int createTaskAndRelation(User operator,
-                                     Long projectCode,
-                                     ProcessDefinition processDefinition,
-                                     ProcessData processData) {
+    public void createTaskAndRelation(User operator,
+                                      Long projectCode,
+                                      ProcessDefinition processDefinition,
+                                      ProcessData processData) {
         List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new 
ArrayList<>() : processData.getTasks();
+        Map<String, Long> taskNameAndCode = new HashMap<>();
         for (TaskNode taskNode : taskNodeList) {
             TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName());
             if (taskDefinition == null) {
@@ -2292,44 +2292,60 @@ public class ProcessService {
                     taskDefinition = new TaskDefinition();
                     taskDefinition.setCode(code);
                 } catch (SnowFlakeException e) {
-                    logger.error("Task code get error, ", e);
-                    return -1;
+                    throw new ServiceException("Task code get error", e);
                 }
                 saveTaskDefinition(operator, projectCode, taskNode, 
taskDefinition);
             } else {
                 if (isTaskOnline(taskDefinition.getCode())) {
-                    // TODO return something for fail
-                    return -1;
+                    throw new ServiceException(String.format("The task %s is 
on line in process", taskNode.getName()));
                 }
                 updateTaskDefinition(operator, projectCode, taskNode, 
taskDefinition);
             }
+            taskNameAndCode.put(taskNode.getName(), taskDefinition.getCode());
         }
         List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper.queryByProcessCode(projectCode, 
processDefinition.getCode());
         if (!processTaskRelationList.isEmpty()) {
             processTaskRelationMapper.deleteByCode(projectCode, 
processDefinition.getCode());
         }
-        // TODO parse taskNodeList for preTaskCode and postTaskCode
-        List<TaskNodeRelation> taskNodeRelationList = 
DagHelper.getProcessDag(taskNodeList).getEdges();
+        List<ProcessTaskRelation> builderRelationList = new ArrayList<>();
         Date now = new Date();
-        ProcessTaskRelation processTaskRelation = new 
ProcessTaskRelation("",// todo relation name
-                processDefinition.getVersion(),
-                projectCode,
-                processDefinition.getCode(),
-                0L,  // todo pre task code
-                0L, // todo post task code
-                ConditionType.of(""), // todo conditionType
-                "", // todo conditionParams
-                now,
-                now);
-        // save process task relation
-        int insert = processTaskRelationMapper.insert(processTaskRelation);
-        // save process task relation log
-        ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog();
-        processTaskRelationLog.set(processTaskRelation);
-        processTaskRelationLog.setOperator(operator.getId());
-        processTaskRelationLog.setOperateTime(now);
-        int logInsert = 
processTaskRelationLogMapper.insert(processTaskRelationLog);
-        return insert & logInsert;
+        for (TaskNode taskNode : taskNodeList) {
+            List<String> depList = taskNode.getDepList();
+            if (CollectionUtils.isNotEmpty(depList)) {
+                for (String preTaskName : depList) {
+                    builderRelationList.add(new ProcessTaskRelation("",// todo 
relation name
+                            processDefinition.getVersion(),
+                            projectCode,
+                            processDefinition.getCode(),
+                            taskNameAndCode.get(preTaskName),
+                            taskNameAndCode.get(taskNode.getName()),
+                            ConditionType.of("none"), // todo conditionType
+                            taskNode.getConditionResult(),
+                            now,
+                            now));
+                }
+            } else {
+                builderRelationList.add(new ProcessTaskRelation("",// todo 
relation name
+                        processDefinition.getVersion(),
+                        projectCode,
+                        processDefinition.getCode(),
+                        0L,
+                        taskNameAndCode.get(taskNode.getName()),
+                        ConditionType.of("none"), // todo conditionType
+                        taskNode.getConditionResult(),
+                        now,
+                        now));
+            }
+        }
+        for (ProcessTaskRelation processTaskRelation : builderRelationList) {
+            processTaskRelationMapper.insert(processTaskRelation);
+            // save process task relation log
+            ProcessTaskRelationLog processTaskRelationLog = new 
ProcessTaskRelationLog();
+            processTaskRelationLog.set(processTaskRelation);
+            processTaskRelationLog.setOperator(operator.getId());
+            processTaskRelationLog.setOperateTime(now);
+            processTaskRelationLogMapper.insert(processTaskRelationLog);
+        }
     }
 
     public int saveTaskDefinition(User operator, Long projectCode, TaskNode 
taskNode, TaskDefinition taskDefinition) {
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 551c9bb..b6d518c 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -439,7 +439,7 @@ public class ProcessServiceTest {
         String expect = JSONUtils.toJsonString(exceptProcessData);
         String oldJson = JSONUtils.toJsonString(oldProcessData);
 
-        Assert.assertEquals(expect, 
processService.changeJson(newProcessData,oldJson));
+        Assert.assertEquals(expect, processService.changeJson(newProcessData, 
oldJson));
 
     }
 }

Reply via email to