This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7dfa997 refactor export process (#1772)
7dfa997 is described below
commit 7dfa9976a0643f1430f0b15deb9784089b813a41
Author: Yelli <[email protected]>
AuthorDate: Thu Jan 9 10:35:52 2020 +0800
refactor export process (#1772)
* refactor process export
* comment & null check for export process
* add License
* add javadoc
* add DataSourceParamTest and DependentParamTest
* refactor process export & add exportProcessMetaDataStr UT
---
.../api/controller/ProjectController.java | 2 +-
.../dolphinscheduler/api/dto/ProcessMeta.java | 247 +++++++++++++++++++++
.../api/service/ProcessDefinitionService.java | 129 +++++------
.../api/utils/exportprocess/DataSourceParam.java | 64 ++++++
.../api/utils/exportprocess/DependentParam.java | 77 +++++++
.../utils/exportprocess/TaskNodeParamFactory.java | 38 ++++
.../exportprocess/exportProcessAddTaskParam.java | 32 +++
.../api/service/ProcessDefinitionServiceTest.java | 94 +++++---
.../utils/exportprocess/DataSourceParamTest.java | 59 +++++
.../utils/exportprocess/DependentParamTest.java | 63 ++++++
pom.xml | 3 +-
11 files changed, 709 insertions(+), 99 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
index bc015c2..571b2ea 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
@@ -256,9 +256,9 @@ public class ProjectController extends BaseController {
/**
* import process definition
- *
* @param loginUser login user
* @param file resource file
+ * @param projectName project name
* @return import result code
*/
@ApiOperation(value = "importProcessDefinition", notes=
"EXPORT_PROCCESS_DEFINITION_NOTES")
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
new file mode 100644
index 0000000..7c4a5cf
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.dto;
+
+/**
+ * ProcessMeta
+ */
+public class ProcessMeta {
+
+ /**
+ * project name
+ */
+ private String projectName;
+
+ /**
+ * process definition name
+ */
+ private String processDefinitionName;
+
+ /**
+ * processs definition json
+ */
+ private String processDefinitionJson;
+
+ /**
+ * process definition desc
+ */
+ private String processDefinitionDescription;
+
+ /**
+ * process definition locations
+ */
+ private String processDefinitionLocations;
+
+ /**
+ * process definition connects
+ */
+ private String processDefinitionConnects;
+
+ /**
+ * warning type
+ */
+ private String scheduleWarningType;
+
+ /**
+ * warning group id
+ */
+ private int scheduleWarningGroupId;
+
+ /**
+ * warning group name
+ */
+ private String scheduleWarningGroupName;
+
+ /**
+ * start time
+ */
+ private String scheduleStartTime;
+
+ /**
+ * end time
+ */
+ private String scheduleEndTime;
+
+ /**
+ * crontab
+ */
+ private String scheduleCrontab;
+
+ /**
+ * failure strategy
+ */
+ private String scheduleFailureStrategy;
+
+ /**
+ * release state
+ */
+ private String scheduleReleaseState;
+
+ /**
+ * process instance priority
+ */
+ private String scheduleProcessInstancePriority;
+
+ /**
+ * worker group id
+ */
+ private int scheduleWorkerGroupId;
+
+ /**
+ * worker group name
+ */
+ private String scheduleWorkerGroupName;
+
+ public ProcessMeta() {
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+
+ public void setProjectName(String projectName) {
+ this.projectName = projectName;
+ }
+
+ public String getProcessDefinitionName() {
+ return processDefinitionName;
+ }
+
+ public void setProcessDefinitionName(String processDefinitionName) {
+ this.processDefinitionName = processDefinitionName;
+ }
+
+ public String getProcessDefinitionJson() {
+ return processDefinitionJson;
+ }
+
+ public void setProcessDefinitionJson(String processDefinitionJson) {
+ this.processDefinitionJson = processDefinitionJson;
+ }
+
+ public String getProcessDefinitionDescription() {
+ return processDefinitionDescription;
+ }
+
+ public void setProcessDefinitionDescription(String
processDefinitionDescription) {
+ this.processDefinitionDescription = processDefinitionDescription;
+ }
+
+ public String getProcessDefinitionLocations() {
+ return processDefinitionLocations;
+ }
+
+ public void setProcessDefinitionLocations(String
processDefinitionLocations) {
+ this.processDefinitionLocations = processDefinitionLocations;
+ }
+
+ public String getProcessDefinitionConnects() {
+ return processDefinitionConnects;
+ }
+
+ public void setProcessDefinitionConnects(String processDefinitionConnects)
{
+ this.processDefinitionConnects = processDefinitionConnects;
+ }
+
+ public String getScheduleWarningType() {
+ return scheduleWarningType;
+ }
+
+ public void setScheduleWarningType(String scheduleWarningType) {
+ this.scheduleWarningType = scheduleWarningType;
+ }
+
+ public int getScheduleWarningGroupId() {
+ return scheduleWarningGroupId;
+ }
+
+ public void setScheduleWarningGroupId(int scheduleWarningGroupId) {
+ this.scheduleWarningGroupId = scheduleWarningGroupId;
+ }
+
+ public String getScheduleWarningGroupName() {
+ return scheduleWarningGroupName;
+ }
+
+ public void setScheduleWarningGroupName(String scheduleWarningGroupName) {
+ this.scheduleWarningGroupName = scheduleWarningGroupName;
+ }
+
+ public String getScheduleStartTime() {
+ return scheduleStartTime;
+ }
+
+ public void setScheduleStartTime(String scheduleStartTime) {
+ this.scheduleStartTime = scheduleStartTime;
+ }
+
+ public String getScheduleEndTime() {
+ return scheduleEndTime;
+ }
+
+ public void setScheduleEndTime(String scheduleEndTime) {
+ this.scheduleEndTime = scheduleEndTime;
+ }
+
+ public String getScheduleCrontab() {
+ return scheduleCrontab;
+ }
+
+ public void setScheduleCrontab(String scheduleCrontab) {
+ this.scheduleCrontab = scheduleCrontab;
+ }
+
+ public String getScheduleFailureStrategy() {
+ return scheduleFailureStrategy;
+ }
+
+ public void setScheduleFailureStrategy(String scheduleFailureStrategy) {
+ this.scheduleFailureStrategy = scheduleFailureStrategy;
+ }
+
+ public String getScheduleReleaseState() {
+ return scheduleReleaseState;
+ }
+
+ public void setScheduleReleaseState(String scheduleReleaseState) {
+ this.scheduleReleaseState = scheduleReleaseState;
+ }
+
+ public String getScheduleProcessInstancePriority() {
+ return scheduleProcessInstancePriority;
+ }
+
+ public void setScheduleProcessInstancePriority(String
scheduleProcessInstancePriority) {
+ this.scheduleProcessInstancePriority = scheduleProcessInstancePriority;
+ }
+
+ public int getScheduleWorkerGroupId() {
+ return scheduleWorkerGroupId;
+ }
+
+ public void setScheduleWorkerGroupId(int scheduleWorkerGroupId) {
+ this.scheduleWorkerGroupId = scheduleWorkerGroupId;
+ }
+
+ public String getScheduleWorkerGroupName() {
+ return scheduleWorkerGroupName;
+ }
+
+ public void setScheduleWorkerGroupName(String scheduleWorkerGroupName) {
+ this.scheduleWorkerGroupName = scheduleWorkerGroupName;
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index ea997af..80967ac 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -22,11 +22,14 @@ import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
+import
org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
+import
org.apache.dolphinscheduler.api.utils.exportprocess.exportProcessAddTaskParam;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -496,43 +499,7 @@ public class ProcessDefinitionService extends
BaseDAGService {
ProcessDefinition processDefinition =
processDefineMapper.queryByDefineId(processDefinitionId);
if (null != processDefinition) {
- //correct task param which has data source or dependent param
- String correctProcessDefinitionJson =
addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
-
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
-
- Map<String, Object> row = new LinkedHashMap<>();
- row.put("projectName", processDefinition.getProjectName());
- row.put("processDefinitionName", processDefinition.getName());
- row.put("processDefinitionJson",
processDefinition.getProcessDefinitionJson());
- row.put("processDefinitionDescription",
processDefinition.getDescription());
- row.put("processDefinitionLocations",
processDefinition.getLocations());
- row.put("processDefinitionConnects",
processDefinition.getConnects());
-
- //schedule info
- List<Schedule> schedules =
scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
- if (!schedules.isEmpty()) {
- Schedule schedule = schedules.get(0);
- row.put("scheduleWarningType", schedule.getWarningType());
- row.put("scheduleWarningGroupId",
schedule.getWarningGroupId());
- row.put("scheduleStartTime",
DateUtils.dateToString(schedule.getStartTime()));
- row.put("scheduleEndTime",
DateUtils.dateToString(schedule.getEndTime()));
- row.put("scheduleCrontab", schedule.getCrontab());
- row.put("scheduleFailureStrategy",
schedule.getFailureStrategy());
- row.put("scheduleReleaseState", ReleaseState.OFFLINE);
- row.put("scheduleProcessInstancePriority",
schedule.getProcessInstancePriority());
- if(schedule.getId() == -1){
- row.put("scheduleWorkerGroupId", -1);
- }else{
- WorkerGroup workerGroup =
workerGroupMapper.selectById(schedule.getWorkerGroupId());
- if(workerGroup != null){
- row.put("scheduleWorkerGroupName",
workerGroup.getName());
- }
- }
-
- }
-
- //create workflow json file
- String rowsJson = JSONUtils.toJsonString(row);
+ String exportProcessJson =
exportProcessMetaDataStr(processDefinitionId, processDefinition);
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.setHeader("Content-Disposition",
"attachment;filename="+processDefinition.getName()+".json");
BufferedOutputStream buff = null;
@@ -540,7 +507,7 @@ public class ProcessDefinitionService extends
BaseDAGService {
try {
out = response.getOutputStream();
buff = new BufferedOutputStream(out);
- buff.write(rowsJson.getBytes(StandardCharsets.UTF_8));
+
buff.write(exportProcessJson.getBytes(StandardCharsets.UTF_8));
buff.flush();
buff.close();
} catch (IOException e) {
@@ -560,13 +527,61 @@ public class ProcessDefinitionService extends
BaseDAGService {
logger.warn("export process output stream not
close", e);
}
}
-
}
}
}
}
/**
+ * get export process metadata string
+ * @param processDefinitionId process definition id
+ * @param processDefinition process definition
+ * @return export process metadata string
+ */
+ public String exportProcessMetaDataStr(Integer processDefinitionId,
ProcessDefinition processDefinition) {
+ //correct task param which has data source or dependent param
+ String correctProcessDefinitionJson =
addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
+
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
+
+ //export process metadata
+ ProcessMeta exportProcessMeta = new ProcessMeta();
+ exportProcessMeta.setProjectName(processDefinition.getProjectName());
+
exportProcessMeta.setProcessDefinitionName(processDefinition.getName());
+
exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson());
+
exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations());
+
exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects());
+
+ //schedule info
+ List<Schedule> schedules =
scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
+ if (!schedules.isEmpty()) {
+ Schedule schedule = schedules.get(0);
+ WorkerGroup workerGroup =
workerGroupMapper.selectById(schedule.getWorkerGroupId());
+
+ if (null == workerGroup && schedule.getWorkerGroupId() == -1) {
+ workerGroup = new WorkerGroup();
+ workerGroup.setId(-1);
+ workerGroup.setName("");
+ }
+
+
exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString());
+
exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId());
+
exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime()));
+
exportProcessMeta.setScheduleEndTime(DateUtils.dateToString(schedule.getEndTime()));
+ exportProcessMeta.setScheduleCrontab(schedule.getCrontab());
+
exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy()));
+
exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE));
+
exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority()));
+
+ if (null != workerGroup) {
+
exportProcessMeta.setScheduleWorkerGroupId(workerGroup.getId());
+
exportProcessMeta.setScheduleWorkerGroupName(workerGroup.getName());
+ }
+ }
+ //create workflow json file
+ return JSONUtils.toJsonString(exportProcessMeta);
+ }
+
+ /**
* correct task param which has datasource or dependent
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
@@ -580,35 +595,9 @@ public class ProcessDefinitionService extends
BaseDAGService {
if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
String taskType = taskNode.getString("type");
- if(checkTaskHasDataSource(taskType)){
- // add sqlParameters
- JSONObject sqlParameters =
JSONUtils.parseObject(taskNode.getString("params"));
- DataSource dataSource =
dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
- if (null != dataSource) {
- sqlParameters.put("datasourceName",
dataSource.getName());
- }
- taskNode.put("params", sqlParameters);
- }else if(checkTaskHasDependent(taskType)){
- // add dependent param
- JSONObject dependentParameters =
JSONUtils.parseObject(taskNode.getString("dependence"));
-
- if(null != dependentParameters){
- JSONArray dependTaskList = (JSONArray)
dependentParameters.get("dependTaskList");
- for (int j = 0; j < dependTaskList.size(); j++) {
- JSONObject dependentTaskModel =
dependTaskList.getJSONObject(j);
- JSONArray dependItemList = (JSONArray)
dependentTaskModel.get("dependItemList");
- for (int k = 0; k < dependItemList.size(); k++) {
- JSONObject dependentItem =
dependItemList.getJSONObject(k);
- int definitionId =
dependentItem.getInteger("definitionId");
- ProcessDefinition definition =
processDefineMapper.queryByDefineId(definitionId);
- if(null != definition){
-
dependentItem.put("projectName",definition.getProjectName());
-
dependentItem.put("definitionName",definition.getName());
- }
- }
- }
- taskNode.put("dependence", dependentParameters);
- }
+ exportProcessAddTaskParam addTaskParam =
TaskNodeParamFactory.getByTaskType(taskType);
+ if (null != addTaskParam) {
+ addTaskParam.addSpecialParam(taskNode);
}
}
}
@@ -648,7 +637,7 @@ public class ProcessDefinitionService extends
BaseDAGService {
* @param loginUser login user
* @param file process metadata json file
* @param currentProjectName current project name
- * @return
+ * @return import process
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> importProcessDefinition(User loginUser,
MultipartFile file, String currentProjectName) {
@@ -860,6 +849,8 @@ public class ProcessDefinitionService extends
BaseDAGService {
* recursion create sub process
* @param loginUser login user
* @param targetProject target project
+ * @param jsonArray process task array
+ * @param subProcessIdMap correct sub process id map
*/
public void importSubProcess(User loginUser, Project targetProject,
JSONArray jsonArray, Map<Integer, Integer> subProcessIdMap) {
for (int i = 0; i < jsonArray.size(); i++) {
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
new file mode 100644
index 0000000..c013aac
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.utils.exportprocess;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * task node add datasource param strategy
+ */
+@Service
+public class DataSourceParam implements exportProcessAddTaskParam,
InitializingBean {
+
+ @Autowired
+ private DataSourceMapper dataSourceMapper;
+
+ /**
+ * add datasource params
+ * @param taskNode task node json object
+ * @return task node json object
+ */
+ @Override
+ public JSONObject addSpecialParam(JSONObject taskNode) {
+ // add sqlParameters
+ JSONObject sqlParameters =
JSONUtils.parseObject(taskNode.getString("params"));
+ DataSource dataSource = dataSourceMapper.selectById((Integer)
sqlParameters.get("datasource"));
+ if (null != dataSource) {
+ sqlParameters.put("datasourceName", dataSource.getName());
+ }
+ taskNode.put("params", sqlParameters);
+
+ return taskNode;
+ }
+
+
+ /**
+ * put datasource strategy
+ */
+ @Override
+ public void afterPropertiesSet() {
+ TaskNodeParamFactory.register(TaskType.SQL.name(), this);
+ TaskNodeParamFactory.register(TaskType.PROCEDURE.name(), this);
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
new file mode 100644
index 0000000..bdf202c
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.utils.exportprocess;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * task node add dependent param strategy
+ */
+@Service
+public class DependentParam implements exportProcessAddTaskParam,
InitializingBean {
+
+
+ @Autowired
+ ProcessDefinitionMapper processDefineMapper;
+
+ /**
+ * add dependent param
+ * @param taskNode task node json object
+ * @return task node json object
+ */
+ @Override
+ public JSONObject addSpecialParam(JSONObject taskNode) {
+ // add dependent param
+ JSONObject dependentParameters =
JSONUtils.parseObject(taskNode.getString("dependence"));
+
+ if (null != dependentParameters) {
+ JSONArray dependTaskList = (JSONArray)
dependentParameters.get("dependTaskList");
+ for (int j = 0; j < dependTaskList.size(); j++) {
+ JSONObject dependentTaskModel =
dependTaskList.getJSONObject(j);
+ JSONArray dependItemList = (JSONArray)
dependentTaskModel.get("dependItemList");
+ for (int k = 0; k < dependItemList.size(); k++) {
+ JSONObject dependentItem = dependItemList.getJSONObject(k);
+ int definitionId =
dependentItem.getInteger("definitionId");
+ ProcessDefinition definition =
processDefineMapper.queryByDefineId(definitionId);
+ if (null != definition) {
+ dependentItem.put("projectName",
definition.getProjectName());
+ dependentItem.put("definitionName",
definition.getName());
+ }
+ }
+ }
+ taskNode.put("dependence", dependentParameters);
+ }
+
+ return taskNode;
+ }
+
+ /**
+ * put dependent strategy
+ */
+ @Override
+ public void afterPropertiesSet() {
+ TaskNodeParamFactory.register(TaskType.DEPENDENT.name(), this);
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
new file mode 100644
index 0000000..f4faa15
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.utils.exportprocess;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * task node param factory
+ */
+public class TaskNodeParamFactory {
+
+ private static Map<String, exportProcessAddTaskParam> taskServices = new
ConcurrentHashMap<>();
+
+ public static exportProcessAddTaskParam getByTaskType(String taskType){
+ return taskServices.get(taskType);
+ }
+
+ static void register(String taskType, exportProcessAddTaskParam
addSpecialTaskParam){
+ if (null != taskType) {
+ taskServices.put(taskType, addSpecialTaskParam);
+ }
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java
new file mode 100644
index 0000000..5ae1667
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.utils.exportprocess;
+
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * exportProcessAddTaskParam
+ */
+public interface exportProcessAddTaskParam {
+
+ /**
+ * add task special param: sql task dependent task
+ * @param taskNode task node json object
+ * @return task node json object
+ */
+ JSONObject addSpecialParam(JSONObject taskNode);
+}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index b820797..82ba43d 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -22,16 +22,11 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.http.entity.ContentType;
import org.json.JSONException;
import org.junit.Assert;
@@ -52,8 +47,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.MessageFormat;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
@RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class)
@@ -75,6 +69,29 @@ public class ProcessDefinitionServiceTest {
@Mock
private ProjectService projectService;
+ @Mock
+ private ScheduleMapper scheduleMapper;
+
+ @Mock
+ private WorkerGroupMapper workerGroupMapper;
+
+ private String sqlDependentJson = "{\"globalParams\":[]," +
+
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
+ "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select
* from test\"," +
+
"\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
+
+ ",\"localParams\":[],\"connParams\":\"\"," +
+ "\"preStatements\":[],\"postStatements\":[]}," +
+
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
+
"\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
+
"\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+
+
"\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+
+
"\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
+
+
"\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
+
+
"\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+
+
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+
+
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+
@Test
public void queryProccessDefinitionList() throws Exception {
String projectName = "project_test1";
@@ -147,29 +164,22 @@ public class ProcessDefinitionServiceTest {
Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource());
Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition());
+ String corSqlDependentJson =
processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson);
- String sqlDependentJson = "{\"globalParams\":[]," +
-
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
-
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from
test\"," +
-
"\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
+
- ",\"localParams\":[],\"connParams\":\"\"," +
- "\"preStatements\":[],\"postStatements\":[]}," +
-
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+
- "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
-
"\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
-
"\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+
-
"\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+
-
"\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
+
-
"\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
+
-
"\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+
-
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+
-
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+ JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false);
- String corSqlDependentJson =
processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson);
+ }
+ @Test
+ public void testExportProcessMetaDataStr() {
+
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(getSchedulerList());
+ Mockito.when(workerGroupMapper.selectById(-1)).thenReturn(null);
- JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false);
+ ProcessDefinition processDefinition = getProcessDefinition();
+ processDefinition.setProcessDefinitionJson(sqlDependentJson);
+ String exportProcessMetaDataStr =
processDefinitionService.exportProcessMetaDataStr(46, processDefinition);
+ Assert.assertNotEquals(sqlDependentJson,exportProcessMetaDataStr);
}
/**
@@ -350,6 +360,34 @@ public class ProcessDefinitionServiceTest {
return project;
}
+ /**
+ * get mock schedule
+ * @return schedule
+ */
+ private Schedule getSchedule() {
+ Date date = new Date();
+ Schedule schedule = new Schedule();
+ schedule.setId(46);
+ schedule.setProcessDefinitionId(1);
+ schedule.setStartTime(date);
+ schedule.setEndTime(date);
+ schedule.setCrontab("0 0 5 * * ? *");
+ schedule.setFailureStrategy(FailureStrategy.END);
+ schedule.setUserId(1);
+ schedule.setReleaseState(ReleaseState.OFFLINE);
+ schedule.setProcessInstancePriority(Priority.MEDIUM);
+ schedule.setWarningType(WarningType.NONE);
+ schedule.setWarningGroupId(1);
+ schedule.setWorkerGroupId(-1);
+ return schedule;
+ }
+
+ private List<Schedule> getSchedulerList() {
+ List<Schedule> scheduleList = new ArrayList<>();
+ scheduleList.add(getSchedule());
+ return scheduleList;
+ }
+
private void putMsg(Map<String, Object> result, Status status, Object...
statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
new file mode 100644
index 0000000..0a271d9
--- /dev/null
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.utils.exportprocess;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.api.ApiApplicationServer;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.json.JSONException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+/**
+ * DataSourceParamTest
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = ApiApplicationServer.class)
+public class DataSourceParamTest {
+
+ @Test
+ public void testAddDependentSpecialParam() throws JSONException {
+
+ String dependentJson =
"{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," +
+
"\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+
+
"\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
+
+
"\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
+
+ "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}";
+
+
+ JSONObject taskNode = JSONUtils.parseObject(dependentJson);
+ if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
+ String taskType = taskNode.getString("type");
+
+ exportProcessAddTaskParam addTaskParam =
TaskNodeParamFactory.getByTaskType(taskType);
+
+ JSONObject dependent = addTaskParam.addSpecialParam(taskNode);
+
+
JSONAssert.assertEquals(taskNode.toString(),dependent.toString(),false);
+ }
+
+ }
+}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
new file mode 100644
index 0000000..db81138
--- /dev/null
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.utils.exportprocess;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.api.ApiApplicationServer;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.json.JSONException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+/**
+ * DependentParamTest
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = ApiApplicationServer.class)
+public class DependentParamTest {
+
+ @Test
+ public void testAddDependentSpecialParam() throws JSONException {
+
+ String sqlJson =
"{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
+
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from
test\"," +
+
"\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
+
+ ",\"localParams\":[],\"connParams\":\"\"," +
+ "\"preStatements\":[],\"postStatements\":[]}," +
+
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
+
"\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
+ "\"preTasks\":[\"dependent\"]}";
+
+
+ JSONObject taskNode = JSONUtils.parseObject(sqlJson);
+ if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
+ String taskType = taskNode.getString("type");
+
+ exportProcessAddTaskParam addTaskParam =
TaskNodeParamFactory.getByTaskType(taskType);
+
+ JSONObject sql = addTaskParam.addSpecialParam(taskNode);
+
+ JSONAssert.assertEquals(taskNode.toString(),sql.toString(),false);
+ }
+
+ }
+}
diff --git a/pom.xml b/pom.xml
index e0d02b4..09156b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -343,7 +343,6 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -680,6 +679,8 @@
<include>**/common/queue/*.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/utils/FileUtilsTest.java</include>
+
<include>**/api/utils/exportprocess/DataSourceParamTest.java</include>
+
<include>**/api/utils/exportprocess/DependentParamTest.java</include>
<include>**/api/enums/*.java</include>
<include>**/api/service/AccessTokenServiceTest.java</include>
<include>**/api/service/QueueServiceTest.java</include>