This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 26ed786 #1544 fix bug: workflow import (#1676)
26ed786 is described below
commit 26ed786c4d4cd1b8ae0e49a4a2ac7f2677a1948a
Author: Yelli <[email protected]>
AuthorDate: Thu Jan 2 19:45:05 2020 +0800
#1544 fix bug: workflow import (#1676)
* modify FileUtils.readFile2Str
* #1300 Add right alignment function in sql email content
* cancel formatted for alert_mail_template.ftl
* #747 sql task password Log desensitization
* cancel mail_temple
* edit ExcelUtils
* modify test method name
* #747 sql task password Log desensitization
* #1544 workflow import
* Constants add DATASOURCE_PASSWORD_REGEX
* #747 sql task password Log desensitization
* deal with import project have sub process
* modify export process addTaskNodeParam method name
* add testAddTaskNodeSpecialParam UT
* add ProcessDefinitionServiceTest-ut to pom
* add testImportSubProcess in ProcessDefinitionServiceTest
* add testImportSubProcess in ProcessDefinitionServiceTest
* add testImportProcessDefinition
---
.../controller/ProcessDefinitionController.java | 2 +-
.../api/controller/ProjectController.java | 9 +-
.../api/service/ProcessDefinitionService.java | 486 ++++++++++++++-------
.../api/service/ProcessDefinitionServiceTest.java | 294 ++++++++++++-
.../components/fileUpdate/definitionUpdate.vue | 1 +
pom.xml | 1 +
6 files changed, 628 insertions(+), 165 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index 275dfdd..f2e4c4d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -460,7 +460,7 @@ public class ProcessDefinitionController extends
BaseController{
}
}
- if(deleteFailedIdList.size() > 0){
+ if(!deleteFailedIdList.isEmpty()){
putMsg(result,
Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),","));
}else{
putMsg(result, Status.SUCCESS);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
index 66e065e..bc015c2 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
@@ -267,11 +267,12 @@ public class ProjectController extends BaseController {
})
@PostMapping(value="/import-definition")
public Result importProcessDefinition(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
- @RequestParam("file") MultipartFile
file){
+ @RequestParam("file") MultipartFile
file,
+ @RequestParam("projectName") String
projectName){
try{
- logger.info("import process definition by id, login user:{}",
- loginUser.getUserName());
- Map<String, Object> result =
processDefinitionService.importProcessDefinition(loginUser,file);
+ logger.info("import process definition by id, login user:{},
project: {}",
+ loginUser.getUserName(), projectName);
+ Map<String, Object> result =
processDefinitionService.importProcessDefinition(loginUser, file, projectName);
return returnDataList(result);
}catch (Exception e){
logger.error(IMPORT_PROCESS_DEFINE_ERROR.getMsg(),e);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index b114bc4..e80aa22 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -16,9 +16,18 @@
*/
package org.apache.dolphinscheduler.api.service;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
@@ -32,14 +41,6 @@ import
org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.slf4j.Logger;
@@ -56,8 +57,10 @@ import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
@@ -482,50 +485,21 @@ public class ProcessDefinitionService extends
BaseDAGService {
* @param response response
*/
public void exportProcessDefinitionById(User loginUser, String
projectName, Integer processDefinitionId, HttpServletResponse response) {
+ //export project info
Project project = projectMapper.queryByName(projectName);
+ //check user access for project
Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
+
if (resultStatus == Status.SUCCESS) {
+ //get workflow info
ProcessDefinition processDefinition =
processDefineMapper.queryByDefineId(processDefinitionId);
- if (processDefinition != null) {
- JSONObject jsonObject =
JSONUtils.parseObject(processDefinition.getProcessDefinitionJson());
- JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
- for (int i = 0; i < jsonArray.size(); i++) {
- JSONObject taskNode = jsonArray.getJSONObject(i);
- if (taskNode.get("type") != null && taskNode.get("type")
!= "") {
- String taskType = taskNode.getString("type");
- if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())){
- JSONObject sqlParameters =
JSONUtils.parseObject(taskNode.getString("params"));
- DataSource dataSource =
dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
- if (dataSource != null) {
- sqlParameters.put("datasourceName",
dataSource.getName());
- }
- taskNode.put("params", sqlParameters);
- }else if(taskType.equals(TaskType.DEPENDENT.name())){
- JSONObject dependentParameters =
JSONUtils.parseObject(taskNode.getString("dependence"));
- if(dependentParameters != null){
- JSONArray dependTaskList = (JSONArray)
dependentParameters.get("dependTaskList");
- for (int j = 0; j < dependTaskList.size();
j++) {
- JSONObject dependentTaskModel =
dependTaskList.getJSONObject(j);
- JSONArray dependItemList = (JSONArray)
dependentTaskModel.get("dependItemList");
- for (int k = 0; k < dependItemList.size();
k++) {
- JSONObject dependentItem =
dependItemList.getJSONObject(k);
- int definitionId =
dependentItem.getInteger("definitionId");
- ProcessDefinition definition =
processDefineMapper.queryByDefineId(definitionId);
- if(definition != null){
-
dependentItem.put("projectName",definition.getProjectName());
-
dependentItem.put("definitionName",definition.getName());
- }
- }
- }
- taskNode.put("dependence",
dependentParameters);
- }
- }
- }
- }
- jsonObject.put("tasks", jsonArray);
-
processDefinition.setProcessDefinitionJson(jsonObject.toString());
+
+ if (null != processDefinition) {
+ //correct task param which has data source or dependent param
+ String correctProcessDefinitionJson =
addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
+
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
Map<String, Object> row = new LinkedHashMap<>();
row.put("projectName", processDefinition.getProjectName());
@@ -535,8 +509,9 @@ public class ProcessDefinitionService extends
BaseDAGService {
row.put("processDefinitionLocations",
processDefinition.getLocations());
row.put("processDefinitionConnects",
processDefinition.getConnects());
+ //schedule info
List<Schedule> schedules =
scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
- if (schedules.size() > 0) {
+ if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
row.put("scheduleWarningType", schedule.getWarningType());
row.put("scheduleWarningGroupId",
schedule.getWarningGroupId());
@@ -556,6 +531,8 @@ public class ProcessDefinitionService extends
BaseDAGService {
}
}
+
+ //create workflow json file
String rowsJson = JSONUtils.toJsonString(row);
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.setHeader("Content-Disposition",
"attachment;filename="+processDefinition.getName()+".json");
@@ -564,38 +541,136 @@ public class ProcessDefinitionService extends
BaseDAGService {
try {
out = response.getOutputStream();
buff = new BufferedOutputStream(out);
- buff.write(rowsJson.getBytes("UTF-8"));
+ buff.write(rowsJson.getBytes(StandardCharsets.UTF_8));
buff.flush();
buff.close();
} catch (IOException e) {
- e.printStackTrace();
+ logger.warn("export process fail", e);
}finally {
- try {
- buff.close();
- out.close();
- } catch (Exception e) {
- e.printStackTrace();
+ if (null != buff) {
+ try {
+ buff.close();
+ } catch (Exception e) {
+ logger.warn("export process buffer not close", e);
+ }
+ }
+ if (null != out) {
+ try {
+ out.close();
+ } catch (Exception e) {
+ logger.warn("export process output stream not
close", e);
+ }
+ }
+
+ }
+ }
+ }
+ }
+
+ /**
+ * correct task param which has datasource or dependent
+ * @param processDefinitionJson processDefinitionJson
+ * @return correct processDefinitionJson
+ */
+ public String addTaskNodeSpecialParam(String processDefinitionJson) {
+ JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
+ JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
+
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JSONObject taskNode = jsonArray.getJSONObject(i);
+ if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
+ String taskType = taskNode.getString("type");
+
+ if(checkTaskHasDataSource(taskType)){
+ // add sqlParameters
+ JSONObject sqlParameters =
JSONUtils.parseObject(taskNode.getString("params"));
+ DataSource dataSource =
dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
+ if (null != dataSource) {
+ sqlParameters.put("datasourceName",
dataSource.getName());
+ }
+ taskNode.put("params", sqlParameters);
+ }else if(checkTaskHasDependent(taskType)){
+ // add dependent param
+ JSONObject dependentParameters =
JSONUtils.parseObject(taskNode.getString("dependence"));
+
+ if(null != dependentParameters){
+ JSONArray dependTaskList = (JSONArray)
dependentParameters.get("dependTaskList");
+ for (int j = 0; j < dependTaskList.size(); j++) {
+ JSONObject dependentTaskModel =
dependTaskList.getJSONObject(j);
+ JSONArray dependItemList = (JSONArray)
dependentTaskModel.get("dependItemList");
+ for (int k = 0; k < dependItemList.size(); k++) {
+ JSONObject dependentItem =
dependItemList.getJSONObject(k);
+ int definitionId =
dependentItem.getInteger("definitionId");
+ ProcessDefinition definition =
processDefineMapper.queryByDefineId(definitionId);
+ if(null != definition){
+
dependentItem.put("projectName",definition.getProjectName());
+
dependentItem.put("definitionName",definition.getName());
+ }
+ }
+ }
+ taskNode.put("dependence", dependentParameters);
}
}
}
}
+ jsonObject.put("tasks", jsonArray);
+ return jsonObject.toString();
+ }
+
+ /**
+ * check task if has dependent
+ * @param taskType task type
+ * @return if task has dependent return true else false
+ */
+ private boolean checkTaskHasDependent(String taskType) {
+ return taskType.equals(TaskType.DEPENDENT.name());
+ }
+
+ /**
+ * check task if has data source info
+ * @param taskType task type
+ * @return if task has data source return true else false
+ */
+ private boolean checkTaskHasDataSource(String taskType) {
+ return taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name());
}
+ /**
+ * check task if has sub process
+ * @param taskType task type
+ * @return if task has sub process return true else false
+ */
+ private boolean checkTaskHasSubProcess(String taskType) {
+ return taskType.equals(TaskType.SUB_PROCESS.name());
+ }
+
+ /**
+ * import process definition
+ * @param loginUser login user
+ * @param file process metadata json file
+ * @param currentProjectName current project name
+ * @return
+ */
@Transactional(rollbackFor = Exception.class)
- public Map<String, Object> importProcessDefinition(User loginUser,
MultipartFile file) {
+ public Map<String, Object> importProcessDefinition(User loginUser,
MultipartFile file, String currentProjectName) {
Map<String, Object> result = new HashMap<>(5);
- JSONObject json = null;
- try(InputStreamReader inputStreamReader = new InputStreamReader(
file.getInputStream(), "UTF-8" )) {
+ JSONObject json;
+
+ //read workflow json
+ try(InputStreamReader inputStreamReader = new InputStreamReader(
file.getInputStream(), StandardCharsets.UTF_8)) {
BufferedReader streamReader = new
BufferedReader(inputStreamReader);
StringBuilder respomseStrBuilder = new StringBuilder();
- String inputStr = "";
+ String inputStr;
+
while ((inputStr = streamReader.readLine())!= null){
respomseStrBuilder.append( inputStr );
}
+
json = JSONObject.parseObject( respomseStrBuilder.toString() );
- if(json != null){
- String projectName = null;
+
+ if(null != json){
+ String originProjectName = null;
String processDefinitionName = null;
String processDefinitionJson = null;
String processDefinitionDesc = null;
@@ -614,7 +689,7 @@ public class ProcessDefinitionService extends
BaseDAGService {
String scheduleWorkerGroupName = null;
if (ObjectUtils.allNotNull(json.get("projectName"))) {
- projectName = json.get("projectName").toString();
+ originProjectName = json.get("projectName").toString();
} else {
putMsg(result, Status.DATA_IS_NULL,
"processDefinitionName");
return result;
@@ -641,123 +716,226 @@ public class ProcessDefinitionService extends
BaseDAGService {
processDefinitionConnects =
json.get("processDefinitionConnects").toString();
}
- Project project = projectMapper.queryByName(projectName);
- if(project != null){
- processDefinitionName =
recursionProcessDefinitionName(project.getId(), processDefinitionName, 1);
- }
+ //check user access for org project
+ Project originProject =
projectMapper.queryByName(originProjectName);
+ Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, originProject, originProjectName);
+ Status resultStatus = (Status)
checkResult.get(Constants.STATUS);
- JSONObject jsonObject =
JSONUtils.parseObject(processDefinitionJson);
- JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
- for (int j = 0; j < jsonArray.size(); j++) {
- JSONObject taskNode = jsonArray.getJSONObject(j);
- String taskType = taskNode.getString("type");
- if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())) {
- JSONObject sqlParameters =
JSONUtils.parseObject(taskNode.getString("params"));
- List<DataSource> dataSources =
dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
- if (dataSources.size() > 0) {
- DataSource dataSource = dataSources.get(0);
- sqlParameters.put("datasource",
dataSource.getId());
- }
- taskNode.put("params", sqlParameters);
- }else if(taskType.equals(TaskType.DEPENDENT.name())){
- JSONObject dependentParameters =
JSONUtils.parseObject(taskNode.getString("dependence"));
- if(dependentParameters != null){
- JSONArray dependTaskList = (JSONArray)
dependentParameters.get("dependTaskList");
- for (int h = 0; h < dependTaskList.size(); h++) {
- JSONObject dependentTaskModel =
dependTaskList.getJSONObject(h);
- JSONArray dependItemList = (JSONArray)
dependentTaskModel.get("dependItemList");
- for (int k = 0; k < dependItemList.size();
k++) {
- JSONObject dependentItem =
dependItemList.getJSONObject(k);
- Project dependentItemProject =
projectMapper.queryByName(dependentItem.getString("projectName"));
- if(dependentItemProject != null){
- ProcessDefinition definition =
processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName"));
- if(definition != null){
-
dependentItem.put("projectId",dependentItemProject.getId());
-
dependentItem.put("definitionId",definition.getId());
+ if (resultStatus == Status.SUCCESS) {
+ //use currentProjectName to query
+ Project targetProject =
projectMapper.queryByName(currentProjectName);
+ if(null != targetProject){
+ processDefinitionName =
recursionProcessDefinitionName(targetProject.getId(), processDefinitionName, 1);
+ }
+
+ JSONObject jsonObject =
JSONUtils.parseObject(processDefinitionJson);
+ JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
+
+ for (int j = 0; j < jsonArray.size(); j++) {
+ JSONObject taskNode = jsonArray.getJSONObject(j);
+ String taskType = taskNode.getString("type");
+ if(checkTaskHasDataSource(taskType)) {
+ JSONObject sqlParameters =
JSONUtils.parseObject(taskNode.getString("params"));
+ List<DataSource> dataSources =
dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
+ if (!dataSources.isEmpty()) {
+ DataSource dataSource = dataSources.get(0);
+ sqlParameters.put("datasource",
dataSource.getId());
+ }
+ taskNode.put("params", sqlParameters);
+ }else if(checkTaskHasDependent(taskType)){
+ JSONObject dependentParameters =
JSONUtils.parseObject(taskNode.getString("dependence"));
+ if(dependentParameters != null){
+ JSONArray dependTaskList = (JSONArray)
dependentParameters.get("dependTaskList");
+ for (int h = 0; h < dependTaskList.size();
h++) {
+ JSONObject dependentTaskModel =
dependTaskList.getJSONObject(h);
+ JSONArray dependItemList = (JSONArray)
dependentTaskModel.get("dependItemList");
+ for (int k = 0; k < dependItemList.size();
k++) {
+ JSONObject dependentItem =
dependItemList.getJSONObject(k);
+ Project dependentItemProject =
projectMapper.queryByName(dependentItem.getString("projectName"));
+ if(dependentItemProject != null){
+ ProcessDefinition definition =
processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName"));
+ if(definition != null){
+
dependentItem.put("projectId",dependentItemProject.getId());
+
dependentItem.put("definitionId",definition.getId());
+ }
}
}
}
+ taskNode.put("dependence",
dependentParameters);
}
- taskNode.put("dependence", dependentParameters);
}
}
- }
- jsonObject.put("tasks", jsonArray);
- Map<String, Object> createProcessDefinitionResult =
createProcessDefinition(loginUser,projectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
- Integer processDefinitionId = null;
- if
(ObjectUtils.allNotNull(createProcessDefinitionResult.get("processDefinitionId")))
{
- processDefinitionId =
Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
- }
- if (ObjectUtils.allNotNull(json.get("scheduleCrontab")) &&
processDefinitionId != null) {
- Date now = new Date();
- Schedule scheduleObj = new Schedule();
- scheduleObj.setProjectName(projectName);
- scheduleObj.setProcessDefinitionId(processDefinitionId);
-
scheduleObj.setProcessDefinitionName(processDefinitionName);
- scheduleObj.setCreateTime(now);
- scheduleObj.setUpdateTime(now);
- scheduleObj.setUserId(loginUser.getId());
- scheduleObj.setUserName(loginUser.getUserName());
-
-
- scheduleCrontab = json.get("scheduleCrontab").toString();
- scheduleObj.setCrontab(scheduleCrontab);
- if (ObjectUtils.allNotNull(json.get("scheduleStartTime")))
{
- scheduleStartTime =
json.get("scheduleStartTime").toString();
-
scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime));
- }
- if (ObjectUtils.allNotNull(json.get("scheduleEndTime"))) {
- scheduleEndTime =
json.get("scheduleEndTime").toString();
-
scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime));
- }
- if
(ObjectUtils.allNotNull(json.get("scheduleWarningType"))) {
- scheduleWarningType =
json.get("scheduleWarningType").toString();
-
scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType));
- }
- if
(ObjectUtils.allNotNull(json.get("scheduleWarningGroupId"))) {
- scheduleWarningGroupId =
json.get("scheduleWarningGroupId").toString();
-
scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId));
- }
- if
(ObjectUtils.allNotNull(json.get("scheduleFailureStrategy"))) {
- scheduleFailureStrategy =
json.get("scheduleFailureStrategy").toString();
-
scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy));
- }
- if
(ObjectUtils.allNotNull(json.get("scheduleReleaseState"))) {
- scheduleReleaseState =
json.get("scheduleReleaseState").toString();
-
scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState));
+ //recursive sub-process parameter correction map key for
old process id value for new process id
+ Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
+
+ List<Object> subProcessList = jsonArray.stream()
+ .filter(elem ->
checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type")))
+ .collect(Collectors.toList());
+
+ if (!subProcessList.isEmpty()) {
+ importSubProcess(loginUser, targetProject, jsonArray,
subProcessIdMap);
}
- if
(ObjectUtils.allNotNull(json.get("scheduleProcessInstancePriority"))) {
- scheduleProcessInstancePriority =
json.get("scheduleProcessInstancePriority").toString();
-
scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority));
+
+ jsonObject.put("tasks", jsonArray);
+
+ Map<String, Object> createProcessDefinitionResult =
createProcessDefinition(loginUser,currentProjectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
+ Integer processDefinitionId = null;
+ if
(ObjectUtils.allNotNull(createProcessDefinitionResult.get("processDefinitionId")))
{
+ processDefinitionId =
Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
}
- if
(ObjectUtils.allNotNull(json.get("scheduleWorkerGroupId"))) {
- scheduleWorkerGroupId =
json.get("scheduleWorkerGroupId").toString();
- if(scheduleWorkerGroupId != null){
-
scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId));
- }else{
- if
(ObjectUtils.allNotNull(json.get("scheduleWorkerGroupName"))) {
- scheduleWorkerGroupName =
json.get("scheduleWorkerGroupName").toString();
- List<WorkerGroup> workerGroups =
workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
- if(workerGroups.size() > 0){
-
scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
+ if (ObjectUtils.allNotNull(json.get("scheduleCrontab")) &&
processDefinitionId != null) {
+ Date now = new Date();
+ Schedule scheduleObj = new Schedule();
+ scheduleObj.setProjectName(currentProjectName);
+
scheduleObj.setProcessDefinitionId(processDefinitionId);
+
scheduleObj.setProcessDefinitionName(processDefinitionName);
+ scheduleObj.setCreateTime(now);
+ scheduleObj.setUpdateTime(now);
+ scheduleObj.setUserId(loginUser.getId());
+ scheduleObj.setUserName(loginUser.getUserName());
+
+
+ scheduleCrontab =
json.get("scheduleCrontab").toString();
+ scheduleObj.setCrontab(scheduleCrontab);
+ if
(ObjectUtils.allNotNull(json.get("scheduleStartTime"))) {
+ scheduleStartTime =
json.get("scheduleStartTime").toString();
+
scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime));
+ }
+ if
(ObjectUtils.allNotNull(json.get("scheduleEndTime"))) {
+ scheduleEndTime =
json.get("scheduleEndTime").toString();
+
scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime));
+ }
+ if
(ObjectUtils.allNotNull(json.get("scheduleWarningType"))) {
+ scheduleWarningType =
json.get("scheduleWarningType").toString();
+
scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType));
+ }
+ if
(ObjectUtils.allNotNull(json.get("scheduleWarningGroupId"))) {
+ scheduleWarningGroupId =
json.get("scheduleWarningGroupId").toString();
+
scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId));
+ }
+ if
(ObjectUtils.allNotNull(json.get("scheduleFailureStrategy"))) {
+ scheduleFailureStrategy =
json.get("scheduleFailureStrategy").toString();
+
scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy));
+ }
+ if
(ObjectUtils.allNotNull(json.get("scheduleReleaseState"))) {
+ scheduleReleaseState =
json.get("scheduleReleaseState").toString();
+
scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState));
+ }
+ if
(ObjectUtils.allNotNull(json.get("scheduleProcessInstancePriority"))) {
+ scheduleProcessInstancePriority =
json.get("scheduleProcessInstancePriority").toString();
+
scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority));
+ }
+ if
(ObjectUtils.allNotNull(json.get("scheduleWorkerGroupId"))) {
+ scheduleWorkerGroupId =
json.get("scheduleWorkerGroupId").toString();
+ if(scheduleWorkerGroupId != null){
+
scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId));
+ }else{
+ if
(ObjectUtils.allNotNull(json.get("scheduleWorkerGroupName"))) {
+ scheduleWorkerGroupName =
json.get("scheduleWorkerGroupName").toString();
+ List<WorkerGroup> workerGroups =
workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
+ if(!workerGroups.isEmpty()){
+
scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
+ }
}
}
}
+ scheduleMapper.insert(scheduleObj);
}
- scheduleMapper.insert(scheduleObj);
+
+ putMsg(result, Status.SUCCESS);
+ return result;
}
}else{
- putMsg(result, Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR);
+ putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
return result;
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
- putMsg(result, Status.SUCCESS);
return result;
}
+ /**
+ * check import process has sub process
+ * recursion create sub process
+ * @param loginUser login user
+ * @param targetProject target project
+ */
+ public void importSubProcess(User loginUser, Project targetProject,
JSONArray jsonArray, Map<Integer, Integer> subProcessIdMap) {
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JSONObject taskNode = jsonArray.getJSONObject(i);
+ String taskType = taskNode.getString("type");
+
+ if (checkTaskHasSubProcess(taskType)) {
+ //get sub process info
+ JSONObject subParams =
JSONUtils.parseObject(taskNode.getString("params"));
+ Integer subProcessId =
subParams.getInteger("processDefinitionId");
+ ProcessDefinition subProcess =
processDefineMapper.queryByDefineId(subProcessId);
+ String subProcessJson = subProcess.getProcessDefinitionJson();
+ //check current project has sub process
+ ProcessDefinition currentProjectSubProcess =
processDefineMapper.queryByDefineName(targetProject.getId(),
subProcess.getName());
+
+ if (null == currentProjectSubProcess) {
+ JSONArray subJsonArray = (JSONArray)
JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks");
+
+ List<Object> subProcessList = subJsonArray.stream()
+ .filter(item ->
checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
+ .collect(Collectors.toList());
+
+ if (!subProcessList.isEmpty()) {
+ importSubProcess(loginUser, targetProject,
subJsonArray, subProcessIdMap);
+ //sub process processId correct
+ if (!subProcessIdMap.isEmpty()) {
+
+ for (Map.Entry<Integer, Integer> entry :
subProcessIdMap.entrySet()) {
+ String oldSubProcessId =
"\"processDefinitionId\":" + entry.getKey();
+ String newSubProcessId =
"\"processDefinitionId\":" + entry.getValue();
+ subProcessJson =
subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
+ }
+
+ subProcessIdMap.clear();
+ }
+ }
+
+ //if sub-process recursion
+ Date now = new Date();
+ //create sub process in target project
+ ProcessDefinition processDefine = new ProcessDefinition();
+ processDefine.setName(subProcess.getName());
+ processDefine.setVersion(subProcess.getVersion());
+
processDefine.setReleaseState(subProcess.getReleaseState());
+ processDefine.setProjectId(targetProject.getId());
+ processDefine.setUserId(loginUser.getId());
+ processDefine.setProcessDefinitionJson(subProcessJson);
+ processDefine.setDescription(subProcess.getDescription());
+ processDefine.setLocations(subProcess.getLocations());
+ processDefine.setConnects(subProcess.getConnects());
+ processDefine.setTimeout(subProcess.getTimeout());
+ processDefine.setTenantId(subProcess.getTenantId());
+
processDefine.setGlobalParams(subProcess.getGlobalParams());
+ processDefine.setCreateTime(now);
+ processDefine.setUpdateTime(now);
+ processDefine.setFlag(subProcess.getFlag());
+ processDefine.setReceivers(subProcess.getReceivers());
+ processDefine.setReceiversCc(subProcess.getReceiversCc());
+ processDefineMapper.insert(processDefine);
+
+ logger.info("create sub process, project: {}, process
name: {}", targetProject.getName(), processDefine.getName());
+
+ //modify task node
+ ProcessDefinition newSubProcessDefine =
processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName());
+
+ if (null != newSubProcessDefine) {
+ subProcessIdMap.put(subProcessId,
newSubProcessDefine.getId());
+ subParams.put("processDefinitionId",
newSubProcessDefine.getId());
+ taskNode.put("params", subParams);
+ }
+
+ }
+ }
+ }
+ }
/**
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 90c800f..b820797 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -16,38 +16,80 @@
*/
package org.apache.dolphinscheduler.api.service;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
-import com.alibaba.fastjson.JSON;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.http.entity.ContentType;
+import org.json.JSONException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.skyscreamer.jsonassert.JSONAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.mock.web.MockMultipartFile;
+import org.springframework.web.multipart.MultipartFile;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.HashMap;
import java.util.Map;
-@RunWith(SpringRunner.class)
+@RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class)
public class ProcessDefinitionServiceTest {
private static final Logger logger =
LoggerFactory.getLogger(ProcessDefinitionServiceTest.class);
- @Autowired
+ @InjectMocks
ProcessDefinitionService processDefinitionService;
+ @Mock
+ private DataSourceMapper dataSourceMapper;
+
+ @Mock
+ private ProcessDefinitionMapper processDefineMapper;
+
+ @Mock
+ private ProjectMapper projectMapper;
+
+ @Mock
+ private ProjectService projectService;
+
@Test
public void queryProccessDefinitionList() throws Exception {
+ String projectName = "project_test1";
+
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
+ Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
+ Map<String, Object> result = new HashMap<>(5);
+ putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
+
+
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
+
Map<String, Object> map =
processDefinitionService.queryProccessDefinitionList(loginUser,"project_test1");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT,
map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
@@ -55,10 +97,20 @@ public class ProcessDefinitionServiceTest {
@Test
public void queryProcessDefinitionListPagingTest() throws Exception {
+ String projectName = "project_test1";
+
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
+
+ Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
+
+ Map<String, Object> result = new HashMap<>(5);
+ putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
+
+
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
+
Map<String, Object> map =
processDefinitionService.queryProcessDefinitionListPaging(loginUser,
"project_test1", "",1, 5,0);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT,
map.get(Constants.STATUS));
@@ -67,13 +119,243 @@ public class ProcessDefinitionServiceTest {
@Test
public void deleteProcessDefinitionByIdTest() throws Exception {
+ String projectName = "project_test1";
+
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
+ Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
- Map<String, Object> map =
processDefinitionService.deleteProcessDefinitionById(loginUser, "li_sql_test",
6);
+
+ Map<String, Object> result = new HashMap<>(5);
+ putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
+
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
+
+ Map<String, Object> map =
processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 6);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT,
map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}
+
+ /**
+ * add datasource param and dependent when export process
+ * @throws JSONException
+ */
+ @Test
+ public void testAddTaskNodeSpecialParam() throws JSONException {
+
+
Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource());
+
Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition());
+
+
+ String sqlDependentJson = "{\"globalParams\":[]," +
+
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
+
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from
test\"," +
+
"\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
+
+ ",\"localParams\":[],\"connParams\":\"\"," +
+ "\"preStatements\":[],\"postStatements\":[]}," +
+
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
+
"\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
+
"\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+
+
"\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+
+
"\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
+
+
"\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
+
+
"\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+
+
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+
+
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+
+ String corSqlDependentJson =
processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson);
+
+
+ JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false);
+
+ }
+
+ /**
+ * import sub process test
+ */
+ @Test
+ public void testImportSubProcess() {
+
+ User loginUser = new User();
+ loginUser.setId(1);
+ loginUser.setUserType(UserType.ADMIN_USER);
+
+ Project testProject = getProject("test");
+
+ //Recursive subprocess sub2 process in sub1 process and sub1process in
top process
+ String topProcessJson = "{\"globalParams\":[]," +
+
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-38634\",\"name\":\"shell1\"," +
+
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho
\\\"shell-1\\\"\"}," +
+
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+
+
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+
+
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}," +
+
"{\"type\":\"SUB_PROCESS\",\"id\":\"tasks-44207\",\"name\":\"shell-4\"," +
+
"\"params\":{\"processDefinitionId\":39},\"description\":\"\",\"runFlag\":\"NORMAL\","
+
+
"\"dependence\":{},\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
+ "\"preTasks\":[\"shell1\"]}],\"tenantId\":1,\"timeout\":0}";
+
+ String sub1ProcessJson =
"{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-84090\"," +
+
"\"name\":\"shell-4\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho
\\\"shell-4\\\"\"}," +
+
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+
+
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+
+
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]},{\"type\":\"SUB_PROCESS\","
+
+ "\"id\":\"tasks-87364\",\"name\":\"shell-5\"," +
+
"\"params\":{\"processDefinitionId\":46},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},"
+
+
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+
+
"\"workerGroupId\":-1,\"preTasks\":[\"shell-4\"]}],\"tenantId\":1,\"timeout\":0}";
+
+ String sub2ProcessJson = "{\"globalParams\":[]," +
+
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
+
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
\\\"shell-5\\\"\"},\"description\":\"\"," +
+
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+
+
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,"
+
+ "\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+
+
+ JSONObject jsonObject = JSONUtils.parseObject(topProcessJson);
+ JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
+
+ String originSubJson = jsonArray.toString();
+
+ Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
+
+ ProcessDefinition shellDefinition1 = new ProcessDefinition();
+ shellDefinition1.setId(39);
+ shellDefinition1.setName("shell-4");
+ shellDefinition1.setProjectId(2);
+ shellDefinition1.setProcessDefinitionJson(sub1ProcessJson);
+
+ ProcessDefinition shellDefinition2 = new ProcessDefinition();
+ shellDefinition2.setId(46);
+ shellDefinition2.setName("shell-5");
+ shellDefinition2.setProjectId(2);
+ shellDefinition2.setProcessDefinitionJson(sub2ProcessJson);
+
+
Mockito.when(processDefineMapper.queryByDefineId(39)).thenReturn(shellDefinition1);
+
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
+
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(),
"shell-5")).thenReturn(null);
+
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(),
"shell-4")).thenReturn(null);
+
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(),
"testProject")).thenReturn(shellDefinition2);
+
+
processDefinitionService.importSubProcess(loginUser,testProject,jsonArray,subProcessIdMap);
+
+ String correctSubJson = jsonArray.toString();
+
+ Assert.assertEquals(originSubJson, correctSubJson);
+
+ }
+
+ @Test
+ public void testImportProcessDefinitionById() throws IOException {
+
+ String processJson =
"{\"projectName\":\"testProject\",\"processDefinitionName\":\"shell-4\"," +
+
"\"processDefinitionJson\":\"{\\\"tenantId\\\":1,\\\"globalParams\\\":[]," +
+
"\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"description\\\":\\\"\\\",\\\"runFlag\\\":\\\"NORMAL\\\","
+
+
"\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"#!/bin/bash\\\\necho
\\\\\\\"shell-4\\\\\\\"\\\"," +
+
"\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"},"
+
+
"\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-4\\\","
+
+
"\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-84090\\\"},"
+
+
"{\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-5\\\",\\\"workerGroupId\\\":-1,"
+
+
"\\\"description\\\":\\\"\\\",\\\"dependence\\\":{},\\\"preTasks\\\":[\\\"shell-4\\\"],\\\"id\\\":\\\"tasks-87364\\\","
+
+
"\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SUB_PROCESS\\\",\\\"params\\\":{\\\"processDefinitionId\\\":46},"
+
+
"\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}}],\\\"timeout\\\":0}\","
+
+
"\"processDefinitionDescription\":\"\",\"processDefinitionLocations\":\"{\\\"tasks-84090\\\":{\\\"name\\\":\\\"shell-4\\\","
+
+
"\\\"targetarr\\\":\\\"\\\",\\\"x\\\":128,\\\"y\\\":114},\\\"tasks-87364\\\":{\\\"name\\\":\\\"shell-5\\\","
+
+
"\\\"targetarr\\\":\\\"tasks-84090\\\",\\\"x\\\":266,\\\"y\\\":115}}\"," +
+
"\"processDefinitionConnects\":\"[{\\\"endPointSourceId\\\":\\\"tasks-84090\\\","
+
+ "\\\"endPointTargetId\\\":\\\"tasks-87364\\\"}]\"}";
+
+ String subProcessJson = "{\"globalParams\":[]," +
+
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
+
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo
\\\"shell-5\\\"\"},\"description\":\"\"," +
+
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+
+
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,"
+
+ "\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+
+ FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson);
+
+ File file = new File("/tmp/task.json");
+
+ FileInputStream fileInputStream = new
FileInputStream("/tmp/task.json");
+
+ MultipartFile multipartFile = new MockMultipartFile(file.getName(),
file.getName(),
+ ContentType.APPLICATION_OCTET_STREAM.toString(),
fileInputStream);
+
+ User loginUser = new User();
+ loginUser.setId(1);
+ loginUser.setUserType(UserType.ADMIN_USER);
+
+ String currentProjectName = "testProject";
+ Map<String, Object> result = new HashMap<>(5);
+ putMsg(result, Status.SUCCESS, currentProjectName);
+
+ ProcessDefinition shellDefinition2 = new ProcessDefinition();
+ shellDefinition2.setId(46);
+ shellDefinition2.setName("shell-5");
+ shellDefinition2.setProjectId(2);
+ shellDefinition2.setProcessDefinitionJson(subProcessJson);
+
+
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName));
+ Mockito.when(projectService.checkProjectAndAuth(loginUser,
getProject(currentProjectName), currentProjectName)).thenReturn(result);
+
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
+
+ //import process
+ Map<String, Object> importProcessResult =
processDefinitionService.importProcessDefinition(loginUser, multipartFile,
currentProjectName);
+
+ Assert.assertEquals(Status.SUCCESS,
importProcessResult.get(Constants.STATUS));
+
+ boolean delete = file.delete();
+
+ Assert.assertTrue(delete);
+
+
+ }
+
+ /**
+ * get mock datasource
+ * @return DataSource
+ */
+ private DataSource getDataSource(){
+ DataSource dataSource = new DataSource();
+ dataSource.setId(2);
+ dataSource.setName("test");
+ return dataSource;
+ }
+
+ /**
+ * get mock processDefinition
+ * @return ProcessDefinition
+ */
+ private ProcessDefinition getProcessDefinition(){
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(46);
+ processDefinition.setName("testProject");
+ processDefinition.setProjectId(2);
+ return processDefinition;
+ }
+
+ /**
+ * get mock Project
+ * @param projectName projectName
+ * @return Project
+ */
+ private Project getProject(String projectName){
+ Project project = new Project();
+ project.setId(1);
+ project.setName(projectName);
+ project.setUserId(1);
+ return project;
+ }
+
+ private void putMsg(Map<String, Object> result, Status status, Object...
statusParams) {
+ result.put(Constants.STATUS, status);
+ if (statusParams != null && statusParams.length > 0) {
+ result.put(Constants.MSG, MessageFormat.format(status.getMsg(),
statusParams));
+ } else {
+ result.put(Constants.MSG, status.getMsg());
+ }
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue
b/dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue
index e8d8ee3..95851b7 100644
---
a/dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue
+++
b/dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue
@@ -141,6 +141,7 @@
let self = this
let formData = new FormData()
formData.append('file', this.file)
+ formData.append('projectName',this.store.state.dag.projectName)
io.post(`projects/import-definition`, res => {
this.$message.success(res.msg)
resolve()
diff --git a/pom.xml b/pom.xml
index 6892b38..cebff64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -675,6 +675,7 @@
<include>**/api/service/WorkerGroupServiceTest.java</include>
<include>**/api/service/AlertGroupServiceTest.java</include>
<include>**/api/service/ProjectServiceTest.java</include>
+
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/UdfFuncServiceTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include>
<include>**/alert/utils/FuncUtilsTest.java</include>