caishunfeng commented on a change in pull request #7214:
URL: https://github.com/apache/dolphinscheduler/pull/7214#discussion_r773718640



##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
##########
@@ -865,6 +883,166 @@ public DagDataSchedule 
exportProcessDagData(ProcessDefinition processDefinition)
         return result;
     }
 
+    @Override
+    @Transactional(rollbackFor = RuntimeException.class)
+    public Map<String, Object> importSqlProcessDefinition(User loginUser, long 
projectCode, MultipartFile file) {

Review comment:
       It's a big method, can you split into some independent methods?

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
##########
@@ -865,6 +883,166 @@ public DagDataSchedule 
exportProcessDagData(ProcessDefinition processDefinition)
         return result;
     }
 
+    @Override
+    @Transactional(rollbackFor = RuntimeException.class)
+    public Map<String, Object> importSqlProcessDefinition(User loginUser, long 
projectCode, MultipartFile file) {
+        Map<String, Object> result = new HashMap<>();
+        String processDefinitionName = file.getOriginalFilename() == null ? 
file.getName() : file.getOriginalFilename();
+        int index = processDefinitionName.lastIndexOf(".");
+        if (index > 0) {
+            processDefinitionName = processDefinitionName.substring(0, index);
+        }
+        processDefinitionName = processDefinitionName + "_import_" + 
DateUtils.getCurrentTimeStamp();
+
+        ProcessDefinition processDefinition;
+        List<TaskDefinitionLog> taskDefinitionList = new ArrayList<>();
+        List<ProcessTaskRelationLog> processTaskRelationList = new 
ArrayList<>();
+
+        // In most cases, there will be only one data source
+        Map<String, DataSource> dataSourceCache = new HashMap<>(1);
+        Map<String, Long> taskNameToCode = new HashMap<>(16);
+        Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
+        try (ZipInputStream zIn = new ZipInputStream(file.getInputStream());
+             BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(zIn))) {
+            // build process definition
+            processDefinition = new ProcessDefinition(projectCode,
+                processDefinitionName,
+                CodeGenerateUtils.getInstance().genCode(),
+                "",
+                "[]", null,
+                0, loginUser.getId(), loginUser.getTenantId());
+            ZipEntry entry;
+            while ((entry = zIn.getNextEntry()) != null) {
+                if (!entry.isDirectory()) {
+                    StringBuilder sql = new StringBuilder();
+                    String taskName = null;
+                    String datasourceName = null;
+                    List<String> upstreams = Collections.emptyList();
+                    String line;
+                    while ((line = bufferedReader.readLine()) != null) {
+                        int commentIndex = line.indexOf("-- ");
+                        if (commentIndex >= 0) {
+                            int colonIndex = line.indexOf(":", commentIndex);
+                            if (colonIndex > 0) {
+                                String key = line.substring(commentIndex + 3, 
colonIndex).trim().toLowerCase();
+                                String value = line.substring(colonIndex + 
1).trim();
+                                switch (key) {
+                                    case "name":
+                                        taskName = value;
+                                        line = line.substring(0, commentIndex);
+                                        break;
+                                    case "upstream":
+                                        upstreams = 
Arrays.stream(value.split(",")).map(String::trim)
+                                            .filter(s -> 
!"".equals(s)).collect(Collectors.toList());
+                                        line = line.substring(0, commentIndex);
+                                        break;
+                                    case "datasource":
+                                        datasourceName = value;
+                                        line = line.substring(0, commentIndex);
+                                        break;
+                                    default:
+                                        break;
+                                }
+                            }
+                        }
+                        if (!"".equals(line)) {
+                            sql.append(line).append("\n");
+                        }
+                    }
+                    if (taskName == null) {
+                        taskName = entry.getName();
+                        index = taskName.indexOf("/");
+                        if (index > 0) {
+                            taskName = taskName.substring(index + 1);
+                        }
+                        index = taskName.lastIndexOf(".");
+                        if (index > 0) {
+                            taskName = taskName.substring(0, index);
+                        }
+                    }
+                    DataSource dataSource = 
dataSourceCache.get(datasourceName);
+                    if (dataSource == null) {
+                        if (isAdmin(loginUser)) {
+                            List<DataSource> dataSources  = 
dataSourceMapper.queryDataSourceByName(datasourceName);
+                            if (CollectionUtils.isNotEmpty(dataSources)) {
+                                dataSource = dataSources.get(0);
+                            }
+                        } else {
+                            dataSource = 
dataSourceMapper.queryDataSourceByNameAndUserId(loginUser.getId(), 
datasourceName);
+                        }
+                    }
+                    if (dataSource == null) {
+                        putMsg(result, Status.DATASOURCE_NAME_ILLEGAL);
+                        return result;
+                    }
+
+                    // build task definition
+                    TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
+                    taskDefinition.setName(taskName);
+                    taskDefinition.setFlag(Flag.YES);
+                    SqlParameters sqlParameters = new SqlParameters();
+                    sqlParameters.setType(dataSource.getType().name());
+                    sqlParameters.setDatasource(dataSource.getId());
+                    sqlParameters.setSql(sql.substring(0, sql.length() - 1));
+                    // it may be a query type, but it can only be determined 
by parsing SQL
+                    sqlParameters.setSqlType(SqlType.NON_QUERY.ordinal());
+                    sqlParameters.setLocalParams(Collections.emptyList());
+                    
taskDefinition.setTaskParams(JSONUtils.toJsonString(sqlParameters));
+                    
taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+                    taskDefinition.setTaskType(TaskType.SQL.getDesc());
+                    taskDefinition.setFailRetryTimes(0);
+                    taskDefinition.setFailRetryInterval(0);
+                    taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
+                    taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP);
+                    taskDefinition.setTaskPriority(Priority.MEDIUM);
+                    taskDefinition.setEnvironmentCode(-1);
+                    taskDefinition.setTimeout(0);
+                    taskDefinition.setDelayTime(0);
+                    
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
+                    taskDefinition.setVersion(0);
+                    taskDefinition.setResourceIds("");
+
+                    taskDefinitionList.add(taskDefinition);
+                    taskNameToCode.put(taskDefinition.getName(), 
taskDefinition.getCode());
+                    taskNameToUpstream.put(taskDefinition.getName(), 
upstreams);
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
+            return result;
+        }
+
+        // build task relation
+        for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) {
+            List<String> upstreams = taskNameToUpstream.get(entry.getKey());
+            if (CollectionUtils.isEmpty(upstreams)
+                || (upstreams.size() == 1 && upstreams.contains("root") && 
!taskNameToCode.containsKey("root"))) {
+                ProcessTaskRelationLog processTaskRelation = new 
ProcessTaskRelationLog();
+                processTaskRelation.setPreTaskCode(0);
+                processTaskRelation.setPreTaskVersion(0);
+                processTaskRelation.setPostTaskCode(entry.getValue());
+                processTaskRelation.setPostTaskVersion(0);
+                processTaskRelation.setConditionType(ConditionType.NONE);
+                processTaskRelation.setName("");
+                processTaskRelationList.add(processTaskRelation);
+                continue;
+            }
+            for (String upstream : upstreams) {
+                ProcessTaskRelationLog processTaskRelation = new 
ProcessTaskRelationLog();
+                
processTaskRelation.setPreTaskCode(taskNameToCode.get(upstream));
+                processTaskRelation.setPreTaskVersion(0);
+                processTaskRelation.setPostTaskCode(entry.getValue());
+                processTaskRelation.setPostTaskVersion(0);

Review comment:
       Is it better to set task version = 1?

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
##########
@@ -865,6 +883,166 @@ public DagDataSchedule 
exportProcessDagData(ProcessDefinition processDefinition)
         return result;
     }
 
+    @Override
+    @Transactional(rollbackFor = RuntimeException.class)
+    public Map<String, Object> importSqlProcessDefinition(User loginUser, long 
projectCode, MultipartFile file) {
+        Map<String, Object> result = new HashMap<>();
+        String processDefinitionName = file.getOriginalFilename() == null ? 
file.getName() : file.getOriginalFilename();
+        int index = processDefinitionName.lastIndexOf(".");
+        if (index > 0) {
+            processDefinitionName = processDefinitionName.substring(0, index);
+        }
+        processDefinitionName = processDefinitionName + "_import_" + 
DateUtils.getCurrentTimeStamp();
+
+        ProcessDefinition processDefinition;
+        List<TaskDefinitionLog> taskDefinitionList = new ArrayList<>();
+        List<ProcessTaskRelationLog> processTaskRelationList = new 
ArrayList<>();
+
+        // In most cases, there will be only one data source
+        Map<String, DataSource> dataSourceCache = new HashMap<>(1);
+        Map<String, Long> taskNameToCode = new HashMap<>(16);
+        Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
+        try (ZipInputStream zIn = new ZipInputStream(file.getInputStream());
+             BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(zIn))) {
+            // build process definition
+            processDefinition = new ProcessDefinition(projectCode,
+                processDefinitionName,
+                CodeGenerateUtils.getInstance().genCode(),
+                "",
+                "[]", null,
+                0, loginUser.getId(), loginUser.getTenantId());
+            ZipEntry entry;
+            while ((entry = zIn.getNextEntry()) != null) {
+                if (!entry.isDirectory()) {
+                    StringBuilder sql = new StringBuilder();
+                    String taskName = null;
+                    String datasourceName = null;
+                    List<String> upstreams = Collections.emptyList();
+                    String line;
+                    while ((line = bufferedReader.readLine()) != null) {
+                        int commentIndex = line.indexOf("-- ");

Review comment:
       Is it better to add some format check? if wrong format, user should know 
the reason.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to