JinyLeeChina commented on a change in pull request #7214:
URL: https://github.com/apache/dolphinscheduler/pull/7214#discussion_r767443033



##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
##########
@@ -865,6 +882,149 @@ public DagDataSchedule 
exportProcessDagData(ProcessDefinition processDefinition)
         return result;
     }
 
+    @Override

Review comment:
       Please add transaction control

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
##########
@@ -865,6 +882,149 @@ public DagDataSchedule 
exportProcessDagData(ProcessDefinition processDefinition)
         return result;
     }
 
+    @Override
+    public Map<String, Object> importSqlProcessDefinition(User loginUser, long 
projectCode, MultipartFile file) {
+        Map<String, Object> result = new HashMap<>();
+        String processDefinitionName = file.getOriginalFilename() == null ? 
file.getName() : file.getOriginalFilename();
+        int index = processDefinitionName.lastIndexOf(".");
+        if (index > 0) {
+            processDefinitionName = processDefinitionName.substring(0, index);
+        }
+        // build process definition
+        Date now = new Date();
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setName(processDefinitionName);
+        processDefinition.setCreateTime(now);
+        processDefinition.setUpdateTime(now);
+        processDefinition.setFlag(Flag.YES);
+        processDefinition.setTenantId(-1);
+        processDefinition.setGlobalParamList(Collections.emptyList());
+
+        DagDataSchedule dagDataSchedule = new DagDataSchedule();
+        dagDataSchedule.setProcessDefinition(processDefinition);
+        List<TaskDefinition> taskDefinitionList = new ArrayList<>();
+        dagDataSchedule.setTaskDefinitionList(taskDefinitionList);
+        List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+        dagDataSchedule.setProcessTaskRelationList(processTaskRelationList);
+
+        // In most cases, there will be only one data source
+        Map<String, DataSource> dataSourceCache = new HashMap<>(1);
+        Map<String, Long> taskNameToCode = new HashMap<>(16);
+        Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
+        try (ZipInputStream zIn = new ZipInputStream(file.getInputStream());
+             BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(zIn))) {
+            ZipEntry entry;
+            while ((entry = zIn.getNextEntry()) != null) {
+                if (!entry.isDirectory()) {
+                    StringBuilder sql = new StringBuilder();
+                    String taskName = null;
+                    String datasourceName = null;
+                    List<String> upstreams = Collections.emptyList();
+                    String line;
+                    while ((line = bufferedReader.readLine()) != null) {
+                        int commentIndex = line.indexOf("--");
+                        if (commentIndex >= 0) {
+                            int colonIndex = line.indexOf(":", commentIndex);
+                            if (colonIndex > 0) {
+                                String key = line.substring(commentIndex + 2, 
colonIndex).trim().toLowerCase();
+                                String value = line.substring(colonIndex + 
1).trim();
+                                switch (key) {
+                                    case "name":
+                                        taskName = value;
+                                        break;
+                                    case "upstream":
+                                        upstreams = 
Arrays.stream(value.split(",")).map(String::trim)
+                                            .filter(s -> 
!"".equals(s)).collect(Collectors.toList());
+                                        break;
+                                    case "datasource":
+                                        datasourceName = value;
+                                        break;
+                                    default:
+                                        break;
+                                }
+                            }
+                        }
+                        sql.append(line).append("\n");
+                    }
+                    if (taskName == null) {
+                        taskName = 
entry.getName().substring(processDefinitionName.length() + 1);
+                        index = taskName.lastIndexOf(".");
+                        if (index > 0) {
+                            taskName = taskName.substring(0, index);
+                        }
+                    }
+                    DataSource dataSource = 
dataSourceCache.get(datasourceName);
+                    if (dataSource == null) {
+                        if (isAdmin(loginUser)) {
+                            List<DataSource> dataSources  = 
dataSourceMapper.queryDataSourceByName(datasourceName);
+                            if (CollectionUtils.isNotEmpty(dataSources)) {
+                                dataSource = dataSources.get(0);
+                            }
+                        } else {
+                            dataSource = 
dataSourceMapper.queryDataSourceByNameAndUserId(loginUser.getId(), 
datasourceName);
+                        }
+                    }
+                    if (dataSource == null) {
+                        putMsg(result, Status.DATASOURCE_NAME_ILLEGAL);
+                        return result;
+                    }
+
+                    // build task definition
+                    TaskDefinition taskDefinition = new TaskDefinition();
+                    taskDefinition.setName(taskName);
+                    taskDefinition.setFlag(Flag.YES);
+                    SqlParameters sqlParameters = new SqlParameters();
+                    sqlParameters.setType(dataSource.getType().name());
+                    sqlParameters.setDatasource(dataSource.getId());
+                    sqlParameters.setSql(sql.substring(0, sql.length() - 1));
+                    sqlParameters.setSqlType(SqlType.NON_QUERY.ordinal());
+                    sqlParameters.setLocalParams(Collections.emptyList());
+                    
taskDefinition.setTaskParams(JSONUtils.toJsonString(sqlParameters));
+                    
taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+                    taskDefinition.setTaskType(TaskType.SQL.getDesc());
+                    taskDefinition.setFailRetryTimes(0);
+                    taskDefinition.setFailRetryInterval(0);
+                    taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
+                    taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP);
+                    taskDefinition.setTaskPriority(Priority.MEDIUM);
+                    taskDefinition.setEnvironmentCode(-1);
+                    taskDefinition.setResourceIds("");
+
+                    taskDefinitionList.add(taskDefinition);
+                    taskNameToCode.put(taskDefinition.getName(), 
taskDefinition.getCode());
+                    taskNameToUpstream.put(taskDefinition.getName(), 
upstreams);
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
+        }
+
+        // build task relation
+        for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) {
+            List<String> upstreams = taskNameToUpstream.get(entry.getKey());
+            if (CollectionUtils.isEmpty(upstreams)
+                || (upstreams.size() == 1 && upstreams.contains("root") && 
!taskNameToCode.containsKey("root"))) {
+                ProcessTaskRelation processTaskRelation = new 
ProcessTaskRelation();
+                processTaskRelation.setPreTaskCode(0);
+                processTaskRelation.setPostTaskCode(entry.getValue());
+                processTaskRelation.setConditionType(ConditionType.NONE);
+                processTaskRelationList.add(processTaskRelation);
+                continue;
+            }
+            for (String upstream : upstreams) {
+                ProcessTaskRelation processTaskRelation = new 
ProcessTaskRelation();
+                
processTaskRelation.setPreTaskCode(taskNameToCode.get(upstream));
+                processTaskRelation.setPostTaskCode(entry.getValue());
+                processTaskRelation.setConditionType(ConditionType.NONE);
+                processTaskRelationList.add(processTaskRelation);
+            }
+        }
+
+        checkAndImport(loginUser, projectCode, result, dagDataSchedule);

Review comment:
       I think there is an essential difference between this and import. It is 
not appropriate to use `checkAndImport`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to