JinyLeeChina commented on a change in pull request #7214:
URL: https://github.com/apache/dolphinscheduler/pull/7214#discussion_r767443033
##########
File path:
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
##########
@@ -865,6 +882,149 @@ public DagDataSchedule
exportProcessDagData(ProcessDefinition processDefinition)
return result;
}
+ @Override
Review comment:
Please add transaction control
##########
File path:
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
##########
@@ -865,6 +882,149 @@ public DagDataSchedule
exportProcessDagData(ProcessDefinition processDefinition)
return result;
}
+ @Override
+ public Map<String, Object> importSqlProcessDefinition(User loginUser, long
projectCode, MultipartFile file) {
+ Map<String, Object> result = new HashMap<>();
+ String processDefinitionName = file.getOriginalFilename() == null ?
file.getName() : file.getOriginalFilename();
+ int index = processDefinitionName.lastIndexOf(".");
+ if (index > 0) {
+ processDefinitionName = processDefinitionName.substring(0, index);
+ }
+ // build process definition
+ Date now = new Date();
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setName(processDefinitionName);
+ processDefinition.setCreateTime(now);
+ processDefinition.setUpdateTime(now);
+ processDefinition.setFlag(Flag.YES);
+ processDefinition.setTenantId(-1);
+ processDefinition.setGlobalParamList(Collections.emptyList());
+
+ DagDataSchedule dagDataSchedule = new DagDataSchedule();
+ dagDataSchedule.setProcessDefinition(processDefinition);
+ List<TaskDefinition> taskDefinitionList = new ArrayList<>();
+ dagDataSchedule.setTaskDefinitionList(taskDefinitionList);
+ List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
+ dagDataSchedule.setProcessTaskRelationList(processTaskRelationList);
+
+ // In most cases, there will be only one data source
+ Map<String, DataSource> dataSourceCache = new HashMap<>(1);
+ Map<String, Long> taskNameToCode = new HashMap<>(16);
+ Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
+ try (ZipInputStream zIn = new ZipInputStream(file.getInputStream());
+ BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(zIn))) {
+ ZipEntry entry;
+ while ((entry = zIn.getNextEntry()) != null) {
+ if (!entry.isDirectory()) {
+ StringBuilder sql = new StringBuilder();
+ String taskName = null;
+ String datasourceName = null;
+ List<String> upstreams = Collections.emptyList();
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ int commentIndex = line.indexOf("--");
+ if (commentIndex >= 0) {
+ int colonIndex = line.indexOf(":", commentIndex);
+ if (colonIndex > 0) {
+ String key = line.substring(commentIndex + 2,
colonIndex).trim().toLowerCase();
+ String value = line.substring(colonIndex +
1).trim();
+ switch (key) {
+ case "name":
+ taskName = value;
+ break;
+ case "upstream":
+ upstreams =
Arrays.stream(value.split(",")).map(String::trim)
+ .filter(s ->
!"".equals(s)).collect(Collectors.toList());
+ break;
+ case "datasource":
+ datasourceName = value;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ sql.append(line).append("\n");
+ }
+ if (taskName == null) {
+ taskName =
entry.getName().substring(processDefinitionName.length() + 1);
+ index = taskName.lastIndexOf(".");
+ if (index > 0) {
+ taskName = taskName.substring(0, index);
+ }
+ }
+ DataSource dataSource =
dataSourceCache.get(datasourceName);
+ if (dataSource == null) {
+ if (isAdmin(loginUser)) {
+ List<DataSource> dataSources =
dataSourceMapper.queryDataSourceByName(datasourceName);
+ if (CollectionUtils.isNotEmpty(dataSources)) {
+ dataSource = dataSources.get(0);
+ }
+ } else {
+ dataSource =
dataSourceMapper.queryDataSourceByNameAndUserId(loginUser.getId(),
datasourceName);
+ }
+ }
+ if (dataSource == null) {
+ putMsg(result, Status.DATASOURCE_NAME_ILLEGAL);
+ return result;
+ }
+
+ // build task definition
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setName(taskName);
+ taskDefinition.setFlag(Flag.YES);
+ SqlParameters sqlParameters = new SqlParameters();
+ sqlParameters.setType(dataSource.getType().name());
+ sqlParameters.setDatasource(dataSource.getId());
+ sqlParameters.setSql(sql.substring(0, sql.length() - 1));
+ sqlParameters.setSqlType(SqlType.NON_QUERY.ordinal());
+ sqlParameters.setLocalParams(Collections.emptyList());
+
taskDefinition.setTaskParams(JSONUtils.toJsonString(sqlParameters));
+
taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+ taskDefinition.setTaskType(TaskType.SQL.getDesc());
+ taskDefinition.setFailRetryTimes(0);
+ taskDefinition.setFailRetryInterval(0);
+ taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
+ taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP);
+ taskDefinition.setTaskPriority(Priority.MEDIUM);
+ taskDefinition.setEnvironmentCode(-1);
+ taskDefinition.setResourceIds("");
+
+ taskDefinitionList.add(taskDefinition);
+ taskNameToCode.put(taskDefinition.getName(),
taskDefinition.getCode());
+ taskNameToUpstream.put(taskDefinition.getName(),
upstreams);
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
+ }
+
+ // build task relation
+ for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) {
+ List<String> upstreams = taskNameToUpstream.get(entry.getKey());
+ if (CollectionUtils.isEmpty(upstreams)
+ || (upstreams.size() == 1 && upstreams.contains("root") &&
!taskNameToCode.containsKey("root"))) {
+ ProcessTaskRelation processTaskRelation = new
ProcessTaskRelation();
+ processTaskRelation.setPreTaskCode(0);
+ processTaskRelation.setPostTaskCode(entry.getValue());
+ processTaskRelation.setConditionType(ConditionType.NONE);
+ processTaskRelationList.add(processTaskRelation);
+ continue;
+ }
+ for (String upstream : upstreams) {
+ ProcessTaskRelation processTaskRelation = new
ProcessTaskRelation();
+
processTaskRelation.setPreTaskCode(taskNameToCode.get(upstream));
+ processTaskRelation.setPostTaskCode(entry.getValue());
+ processTaskRelation.setConditionType(ConditionType.NONE);
+ processTaskRelationList.add(processTaskRelation);
+ }
+ }
+
+ checkAndImport(loginUser, projectCode, result, dagDataSchedule);
Review comment:
I think there is an essential difference between this and import. It is
not appropriate to use `checkAndImport`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]