This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split by this push:
new 763a50d [Feature][JsonSplit] add relation method and refactor task
method (#4737)
763a50d is described below
commit 763a50d0cf600eccfdca9d240ec1fa11e0159456
Author: JinyLeeChina <[email protected]>
AuthorDate: Tue Feb 9 09:52:29 2021 +0800
[Feature][JsonSplit] add relation method and refactor task method (#4737)
* add task query
* modify codestyle
* add task delete/update/swich method
* add task delete/update/swich method
* codestyle
* use updateById save task definition
* modify method name
* code style
* code style
Co-authored-by: JinyLeeChina <[email protected]>
---
.../apache/dolphinscheduler/api/enums/Status.java | 4 +
.../api/service/ProcessInstanceService.java | 22 +-
.../api/service/ProcessTaskRelationService.java | 84 --------
.../service/impl/ProcessDefinitionServiceImpl.java | 22 +-
.../impl/ProcessDefinitionVersionServiceImpl.java | 22 +-
.../impl/ProcessTaskRelationServiceImpl.java | 161 +--------------
.../service/impl/TaskDefinitionServiceImpl.java | 91 ++-------
.../dao/entity/ProcessTaskRelation.java | 16 +-
.../dao/entity/ProcessTaskRelationLog.java | 27 +--
.../dao/entity/TaskDefinition.java | 41 ----
.../dao/mapper/ProcessDefinitionLogMapper.java | 7 +-
.../dao/mapper/ProcessTaskRelationLogMapper.java | 13 ++
.../dao/mapper/ProcessTaskRelationMapper.java | 33 ++-
.../dao/mapper/ProcessTaskRelationLogMapper.xml | 12 +-
.../dao/mapper/ProcessTaskRelationMapper.xml | 33 ++-
.../service/process/ProcessService.java | 223 +++++++++++++--------
sql/dolphinscheduler-postgre.sql | 8 +-
sql/dolphinscheduler_mysql.sql | 4 +-
18 files changed, 304 insertions(+), 519 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 6571930..510b425 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -254,6 +254,10 @@ public enum Status {
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process
definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error",
"导入工作流定义错误"),
TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist",
"任务定义[{0}]不存在"),
+ DELETE_TASK_DEFINE_BY_CODE_ERROR(50031, "delete task definition by code
error", "删除任务定义错误"),
+ DELETE_PROCESS_TASK_RELATION_ERROR(50032, "delete process task relation
error", "删除工作流任务关系错误"),
+ PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not
exist", "工作流任务关系[{0}]不存在"),
+ PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already
exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 7976882..3e99dc4 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -59,7 +59,6 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
-
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -459,16 +458,6 @@ public class ProcessInstanceService extends BaseService {
/**
* sync definition according process instance
- *
- * @param loginUser
- * @param project
- * @param processInstanceJson
- * @param locations
- * @param connects
- * @param processInstance
- * @param processDefinition
- * @param processData
- * @return
*/
private int syncDefinition(User loginUser, Project project, String
processInstanceJson, String locations, String connects,
ProcessInstance processInstance,
ProcessDefinition processDefinition,
@@ -491,13 +480,6 @@ public class ProcessInstanceService extends BaseService {
/**
* update process instance attributes
*
- * @param processInstance
- * @param tenant
- * @param scheduleTime
- * @param locations
- * @param connects
- * @param processInstanceJson
- * @param processData
* @return false if check failed or
*/
private void setProcessInstance(ProcessInstance processInstance, Tenant
tenant,
@@ -747,6 +729,7 @@ public class ProcessInstanceService extends BaseService {
/**
* query process instance by processDefinitionId and stateArray
+ *
* @param processDefinitionId processDefinitionId
* @param states states array
* @return process instance list
@@ -757,11 +740,12 @@ public class ProcessInstanceService extends BaseService {
/**
* query process instance by processDefinitionId
+ *
* @param processDefinitionId processDefinitionId
* @param size size
* @return process instance list
*/
- public List<ProcessInstance> queryByProcessDefineId(int
processDefinitionId,int size) {
+ public List<ProcessInstance> queryByProcessDefineId(int
processDefinitionId, int size) {
return
processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
index 0d1d0ea..76b6389 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
@@ -27,29 +27,6 @@ import java.util.Map;
public interface ProcessTaskRelationService {
/**
- * create process task relation
- *
- * @param loginUser login user
- * @param name relation name
- * @param projectName process name
- * @param processDefinitionCode process definition code
- * @param preTaskCode pre task code
- * @param postTaskCode post task code
- * @param conditionType condition type
- * @param conditionParams condition params
- * @return create result code
- */
- Map<String, Object> createProcessTaskRelation(User loginUser,
- String name,
- String projectName,
- Long processDefinitionCode,
- Long preTaskCode,
- Long postTaskCode,
- String conditionType,
- String conditionParams);
-
-
- /**
* query process task relation
*
* @param loginUser login user
@@ -59,66 +36,5 @@ public interface ProcessTaskRelationService {
Map<String, Object> queryProcessTaskRelation(User loginUser,
String projectName,
Long processDefinitionCode);
-
- /**
- * delete process task relation
- *
- * @param loginUser login user
- * @param projectName project name
- * @param processDefinitionCode process definition code
- */
- Map<String, Object> deleteTaskDefinitionByProcessCode(User loginUser,
- String projectName,
- Long
processDefinitionCode);
-
- /**
- * delete process task relation
- *
- * @param loginUser login user
- * @param projectName project name
- * @param preTaskCode pre task code
- */
- Map<String, Object> deleteTaskDefinitionByTaskCode(User loginUser,
- String projectName,
- Long preTaskCode);
-
-
- /**
- * update process task relation
- *
- * @param loginUser login user
- * @param id process task relation id
- * @param name relation name
- * @param projectName process name
- * @param processDefinitionCode process definition code
- * @param preTaskCode pre task code
- * @param postTaskCode post task code
- * @param conditionType condition type
- * @param conditionParams condition params
- */
- Map<String, Object> updateTaskDefinition(User loginUser,
- int id,
- String name,
- String projectName,
- Long processDefinitionCode,
- Long preTaskCode,
- Long postTaskCode,
- String conditionType,
- String conditionParams);
-
-
- /**
- * switch process task relation version
- *
- * @param loginUser login user
- * @param projectName project name
- * @param processTaskRelationId process task relation id
- * @param version the version user want to switch
- * @return switch process task relation version result code
- */
- Map<String, Object> switchVersion(User loginUser,
- String projectName,
- int processTaskRelationId,
- int version);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 03515e3..575cf19 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -26,10 +26,8 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
-import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
-import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -68,15 +66,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
-
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -133,12 +130,6 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
private ProjectService projectService;
@Autowired
- private TaskDefinitionService taskDefinitionService;
-
- @Autowired
- private ProcessTaskRelationService processTaskRelationService;
-
- @Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Autowired
@@ -157,7 +148,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
private ProcessService processService;
@Autowired
- private TaskDefinitionMapper taskDefinitionMapper;
+ private ProcessTaskRelationMapper processTaskRelationMapper;
/**
* create process definition
@@ -205,10 +196,11 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result;
}
- ProcessDefinitionLog processDefinitionLog =
processService.insertProcessDefinitionLog(loginUser, processDefinitionCode,
processDefinitionName, processData,
+ ProcessDefinitionLog processDefinitionLog =
processService.insertProcessDefinitionLog(loginUser, processDefinitionCode,
processDefinitionName, processData,
project, desc, locations, connects);
processService.switchVersion(processDefinition, processDefinitionLog);
- processService.createTaskAndRelation(loginUser, projectName, "",
processDefinition, processData);
+ // TODO relationName have ?
+ processService.createTaskAndRelation(loginUser, project.getCode(),
processDefinition, processData);
// return processDefinition object with ID
result.put(Constants.DATA_LIST, processDefinition.getId());
@@ -538,7 +530,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
// TODO: replace id to code
// ProcessDefinition processDefinition =
processDefineMapper.deleteByCode(processDefinitionCode);
int delete = processDefinitionMapper.deleteById(processDefinitionId);
-
+ processTaskRelationMapper.deleteByCode(project.getCode(),
processDefinition.getCode());
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
@@ -1613,7 +1605,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
}
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
-
.queryByDefinitionCodeAndVersion(processDefinition.getCode(),version);
+ .queryByDefinitionCodeAndVersion(processDefinition.getCode(),
version);
if (Objects.isNull(processDefinitionLog)) {
putMsg(result
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
index 0776688..fdc9d1f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
@@ -66,17 +66,17 @@ public class ProcessDefinitionVersionServiceImpl extends
BaseService implements
long version =
this.queryMaxVersionByProcessDefinitionId(processDefinition.getId()) + 1;
ProcessDefinitionVersion processDefinitionVersion =
ProcessDefinitionVersion
- .newBuilder()
- .processDefinitionId(processDefinition.getId())
- .version(version)
-
.processDefinitionJson(processDefinition.getProcessDefinitionJson())
- .description(processDefinition.getDescription())
- .locations(processDefinition.getLocations())
- .connects(processDefinition.getConnects())
- .timeout(processDefinition.getTimeout())
- .globalParams(processDefinition.getGlobalParams())
- .createTime(processDefinition.getUpdateTime())
- .warningGroupId(processDefinition.getWarningGroupId())
+ .newBuilder()
+ .processDefinitionId(processDefinition.getId())
+ .version(version)
+
.processDefinitionJson(processDefinition.getProcessDefinitionJson())
+ .description(processDefinition.getDescription())
+ .locations(processDefinition.getLocations())
+ .connects(processDefinition.getConnects())
+ .timeout(processDefinition.getTimeout())
+ .globalParams(processDefinition.getGlobalParams())
+ .createTime(processDefinition.getUpdateTime())
+ .warningGroupId(processDefinition.getWarningGroupId())
.resourceIds(processDefinition.getResourceIds())
.build();
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index f7c5706..5ea01bd 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -22,28 +22,20 @@ import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ConditionType;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
-import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
/**
* task definition service impl
@@ -61,166 +53,27 @@ public class ProcessTaskRelationServiceImpl extends
BaseService implements
private ProjectService projectService;
@Autowired
- private ProcessDefinitionMapper processDefineMapper;
-
- @Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
- @Autowired
- private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
-
- @Autowired
- private TaskDefinitionMapper taskDefinitionMapper;
-
/**
- * create process task relation
+ * query process task relation
*
* @param loginUser login user
- * @param name relation name
- * @param projectName process name
+ * @param projectName project name
* @param processDefinitionCode process definition code
- * @param preTaskCode pre task code
- * @param postTaskCode post task code
- * @param conditionType condition type
- * @param conditionParams condition params
- * @return create result code
*/
- @Transactional
@Override
- public Map<String, Object> createProcessTaskRelation(User loginUser,
- String name,
- String projectName,
- Long
processDefinitionCode,
- Long preTaskCode,
- Long postTaskCode,
- String conditionType,
- String
conditionParams) {
+ public Map<String, Object> queryProcessTaskRelation(User loginUser, String
projectName, Long processDefinitionCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectName);
- Status resultStatus = (Status) checkResult.get(Constants.STATUS);
- if (resultStatus != Status.SUCCESS) {
+ if (checkResult.get(Constants.STATUS) != Status.SUCCESS) {
return checkResult;
}
- // check processDefinitionCode
- ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processDefinitionCode);
- if (processDefinition == null) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
processDefinitionCode);
- return result;
- }
- // check preTaskCode and postTaskCode
- checkTaskDefinitionCode(result, preTaskCode);
- if (postTaskCode > 0) {
- checkTaskDefinitionCode(result, postTaskCode);
- }
- resultStatus = (Status) result.get(Constants.STATUS);
- if (resultStatus != Status.SUCCESS) {
- return result;
- }
- Date now = new Date();
- ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(name,
- 1,
- project.getCode(),
- processDefinitionCode,
- preTaskCode,
- postTaskCode,
- ConditionType.of(conditionType),
- conditionParams,
- now,
- now);
- // save process task relation
- processTaskRelationMapper.insert(processTaskRelation);
- // save process task relation log
- ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
- processTaskRelationLog.set(processTaskRelation);
- processTaskRelationLog.setOperator(loginUser.getId());
- processTaskRelationLog.setOperateTime(now);
- processTaskRelationLogMapper.insert(processTaskRelationLog);
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(project.getCode(),
processDefinitionCode);
+ result.put(Constants.DATA_LIST, processTaskRelationList);
putMsg(result, Status.SUCCESS);
return result;
}
-
- private void checkTaskDefinitionCode(Map<String, Object> result, Long
taskCode) {
- TaskDefinition taskDefinition =
taskDefinitionMapper.queryByDefinitionCode(taskCode);
- if (taskDefinition == null) {
- putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
- }
- }
-
- /**
- * query process task relation
- *
- * @param loginUser login user
- * @param projectName project name
- * @param processDefinitionCode process definition code
- */
- @Override
- public Map<String, Object> queryProcessTaskRelation(User loginUser, String
projectName, Long processDefinitionCode) {
- return null;
- }
-
- /**
- * delete process task relation
- *
- * @param loginUser login user
- * @param projectName project name
- * @param processDefinitionCode process definition code
- */
- @Override
- public Map<String, Object> deleteTaskDefinitionByProcessCode(User
loginUser, String projectName, Long processDefinitionCode) {
- return null;
- }
-
- /**
- * delete process task relation
- *
- * @param loginUser login user
- * @param projectName project name
- * @param preTaskCode pre task code
- */
- @Override
- public Map<String, Object> deleteTaskDefinitionByTaskCode(User loginUser,
String projectName, Long preTaskCode) {
- return null;
- }
-
- /**
- * update process task relation
- *
- * @param loginUser login user
- * @param id process task relation id
- * @param name relation name
- * @param projectName process name
- * @param processDefinitionCode process definition code
- * @param preTaskCode pre task code
- * @param postTaskCode post task code
- * @param conditionType condition type
- * @param conditionParams condition params
- */
- @Override
- public Map<String, Object> updateTaskDefinition(User loginUser,
- int id,
- String name,
- String projectName,
- Long processDefinitionCode,
- Long preTaskCode,
- Long postTaskCode,
- String conditionType,
- String conditionParams) {
- return null;
- }
-
- /**
- * switch process task relation version
- *
- * @param loginUser login user
- * @param projectName project name
- * @param processTaskRelationId process task relation id
- * @param version the version user want to switch
- * @return switch process task relation version result code
- */
- @Override
- public Map<String, Object> switchVersion(User loginUser, String
projectName, int processTaskRelationId, int version) {
- return null;
- }
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index e140052..8b1e672 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -23,26 +23,16 @@ import
org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.ReleaseState;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import
org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@@ -87,9 +77,6 @@ public class TaskDefinitionServiceImpl extends BaseService
implements
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
- private ProcessDefinitionMapper processDefinitionMapper;
-
- @Autowired
private ProcessService processService;
/**
@@ -99,7 +86,7 @@ public class TaskDefinitionServiceImpl extends BaseService
implements
* @param projectName project name
* @param taskDefinitionJson task definition json
*/
- @Transactional(rollbackFor = Exception.class)
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> createTaskDefinition(User loginUser,
String projectName,
@@ -119,9 +106,11 @@ public class TaskDefinitionServiceImpl extends BaseService
implements
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
+ TaskDefinition taskDefinition = new TaskDefinition();
long code = 0L;
try {
code = SnowFlakeUtils.getInstance().nextId();
+ taskDefinition.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
}
@@ -129,37 +118,10 @@ public class TaskDefinitionServiceImpl extends
BaseService implements
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error
generating task definition code");
return result;
}
- Date now = new Date();
- TaskDefinition taskDefinition = new TaskDefinition(code,
- taskNode.getName(),
- 1,
- taskNode.getDesc(),
- project.getCode(),
- loginUser.getId(),
- TaskType.of(taskNode.getType()),
- taskNode.getParams(),
- taskNode.isForbidden() ? Flag.NO : Flag.YES,
- taskNode.getTaskInstancePriority(),
- taskNode.getWorkerGroup(),
- taskNode.getMaxRetryTimes(),
- taskNode.getRetryInterval(),
- taskNode.getTaskTimeoutParameter().getEnable() ?
TimeoutFlag.OPEN : TimeoutFlag.CLOSE,
- taskNode.getTaskTimeoutParameter().getStrategy(),
- taskNode.getTaskTimeoutParameter().getInterval(),
- now,
- now);
-
taskDefinition.setResourceIds(processService.getResourceIds(taskDefinition));
- // save the new task definition
- taskDefinitionMapper.insert(taskDefinition);
- // save task definition log
- TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
- taskDefinitionLog.set(taskDefinition);
- taskDefinitionLog.setOperator(loginUser.getId());
- taskDefinitionLog.setOperateTime(now);
- taskDefinitionLogMapper.insert(taskDefinitionLog);
+ int insert = processService.saveTaskDefinition(loginUser,
project.getCode(), taskNode, taskDefinition);
// return taskDefinition object with code
result.put(Constants.DATA_LIST, code);
- putMsg(result, Status.SUCCESS);
+ putMsg(result, Status.SUCCESS, insert);
return result;
}
@@ -209,16 +171,20 @@ public class TaskDefinitionServiceImpl extends
BaseService implements
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
- checkTaskRelation(result, taskCode);
- resultEnum = (Status) result.get(Constants.STATUS);
- if (resultEnum != Status.SUCCESS) {
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCode(taskCode);
+ if (!processTaskRelationList.isEmpty()) {
+ Set<Long> processDefinitionCodes = processTaskRelationList
+ .stream()
+ .map(ProcessTaskRelation::getProcessDefinitionCode)
+ .collect(Collectors.toSet());
+ putMsg(result, Status.PROCESS_TASK_RELATION_EXIST,
StringUtils.join(processDefinitionCodes, ","));
return result;
}
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
- putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
+ putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
return result;
}
@@ -231,6 +197,7 @@ public class TaskDefinitionServiceImpl extends BaseService
implements
* @param taskCode task code
* @param taskDefinitionJson task definition json
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> updateTaskDefinition(User loginUser, String
projectName, Long taskCode, String taskDefinitionJson) {
Map<String, Object> result = new HashMap<>(5);
@@ -241,8 +208,8 @@ public class TaskDefinitionServiceImpl extends BaseService
implements
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
- checkTaskRelation(result, taskCode);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ if (processService.isTaskOnline(taskCode)) {
+ putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByDefinitionCode(taskCode);
@@ -255,30 +222,12 @@ public class TaskDefinitionServiceImpl extends
BaseService implements
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
-
- processService.updateTaskDefinition(loginUser, project.getCode(),
taskNode, taskDefinition);
+ int update = processService.updateTaskDefinition(loginUser,
project.getCode(), taskNode, taskDefinition);
result.put(Constants.DATA_LIST, taskCode);
- putMsg(result, Status.SUCCESS);
+ putMsg(result, Status.SUCCESS, update);
return result;
}
- public void checkTaskRelation(Map<String, Object> result, Long taskCode) {
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCode(taskCode, taskCode);
- if (!processTaskRelationList.isEmpty()) {
- Set<Long> processDefinitionCodes = processTaskRelationList
- .stream()
- .map(ProcessTaskRelation::getProcessDefinitionCode)
- .collect(Collectors.toSet());
- List<ProcessDefinition> processDefinitionList =
processDefinitionMapper.queryByCodes(processDefinitionCodes);
- // check process definition is already online
- for (ProcessDefinition processDefinition : processDefinitionList) {
- if (processDefinition.getReleaseState() ==
ReleaseState.ONLINE) {
- putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,
processDefinition.getCode());
- return;
- }
- }
- }
- }
public void checkTaskNode(Map<String, Object> result, TaskNode taskNode,
String taskDefinitionJson) {
if (taskNode == null) {
@@ -310,8 +259,8 @@ public class TaskDefinitionServiceImpl extends BaseService
implements
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
- checkTaskRelation(result, taskCode);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ if (processService.isTaskOnline(taskCode)) {
+ putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByDefinitionCode(taskCode);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
index 3f8b256..13a804d 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
@@ -51,9 +51,9 @@ public class ProcessTaskRelation {
private String name;
/**
- * version
+ * process version
*/
- private int version;
+ private int processDefinitionVersion;
/**
* project code
@@ -113,7 +113,7 @@ public class ProcessTaskRelation {
}
public ProcessTaskRelation(String name,
- int version,
+ int processDefinitionVersion,
long projectCode,
long processDefinitionCode,
long preTaskCode,
@@ -123,7 +123,7 @@ public class ProcessTaskRelation {
Date createTime,
Date updateTime) {
this.name = name;
- this.version = version;
+ this.processDefinitionVersion = processDefinitionVersion;
this.projectCode = projectCode;
this.processDefinitionCode = processDefinitionCode;
this.preTaskCode = preTaskCode;
@@ -201,12 +201,12 @@ public class ProcessTaskRelation {
this.conditionParamMap = conditionParamMap;
}
- public int getVersion() {
- return version;
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
}
- public void setVersion(int version) {
- this.version = version;
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
}
public long getProjectCode() {
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
index e25e2df..523bf4e 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
@@ -17,12 +17,6 @@
package org.apache.dolphinscheduler.dao.entity;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
-import com.baomidou.mybatisplus.core.toolkit.StringUtils;
-import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -33,6 +27,13 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
+import com.fasterxml.jackson.annotation.JsonFormat;
+
/**
* process task relation log
*/
@@ -51,9 +52,9 @@ public class ProcessTaskRelationLog {
private String name;
/**
- * version
+ * process version
*/
- private int version;
+ private int processDefinitionVersion;
/**
* project code
@@ -187,12 +188,12 @@ public class ProcessTaskRelationLog {
this.conditionParamMap = conditionParamMap;
}
- public int getVersion() {
- return version;
+ public int getProcessDefinitionVersion() {
+ return processDefinitionVersion;
}
- public void setVersion(int version) {
- this.version = version;
+ public void setProcessDefinitionVersion(int processDefinitionVersion) {
+ this.processDefinitionVersion = processDefinitionVersion;
}
public long getProjectCode() {
@@ -253,7 +254,7 @@ public class ProcessTaskRelationLog {
public void set(ProcessTaskRelation processTaskRelation) {
this.name = processTaskRelation.getName();
- this.version = processTaskRelation.getVersion();
+ this.processDefinitionVersion =
processTaskRelation.getProcessDefinitionVersion();
this.projectCode = processTaskRelation.getProjectCode();
this.processDefinitionCode =
processTaskRelation.getProcessDefinitionCode();
this.preTaskCode = processTaskRelation.getPreTaskCode();
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index a7482cb..1158aae 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -167,47 +167,6 @@ public class TaskDefinition {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
- public TaskDefinition() {
- }
-
- public TaskDefinition(long code,
- String name,
- int version,
- String description,
- long projectCode,
- int userId,
- TaskType taskType,
- String taskParams,
- Flag flag,
- Priority taskPriority,
- String workerGroup,
- int failRetryTimes,
- int failRetryInterval,
- TimeoutFlag timeoutFlag,
- TaskTimeoutStrategy taskTimeoutStrategy,
- int timeout,
- Date createTime,
- Date updateTime) {
- this.code = code;
- this.name = name;
- this.version = version;
- this.description = description;
- this.projectCode = projectCode;
- this.userId = userId;
- this.taskType = taskType;
- this.taskParams = taskParams;
- this.flag = flag;
- this.taskPriority = taskPriority;
- this.workerGroup = workerGroup;
- this.failRetryTimes = failRetryTimes;
- this.failRetryInterval = failRetryInterval;
- this.timeoutFlag = timeoutFlag;
- this.taskTimeoutStrategy = taskTimeoutStrategy;
- this.timeout = timeout;
- this.createTime = createTime;
- this.updateTime = updateTime;
- }
-
public String getName() {
return name;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
index 18c3b96..c6d3a54 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+
import org.apache.ibatis.annotations.Param;
+
import java.util.List;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
@@ -35,7 +38,7 @@ public interface ProcessDefinitionLogMapper extends
BaseMapper<ProcessDefinition
* @return process definition log list
*/
List<ProcessDefinitionLog> queryByDefinitionName(@Param("projectCode")
Long projectCode,
- @Param("processDefinitionName") String
name);
+
@Param("processDefinitionName") String name);
/**
* query process definition log list
@@ -47,8 +50,6 @@ public interface ProcessDefinitionLogMapper extends
BaseMapper<ProcessDefinition
/**
* query max version for definition
- * @param processDefinitionCode
- * @return
*/
int queryMaxVersionForDefinition(@Param("processDefinitionCode") long
processDefinitionCode);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
index d2b5676..deda046 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
@@ -19,6 +19,10 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
@@ -26,4 +30,13 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
*/
public interface ProcessTaskRelationLogMapper extends
BaseMapper<ProcessTaskRelationLog> {
+ /**
+ * query process task relation log
+ *
+ * @param processCode process definition code
+ * @param processVersion process version
+ * @return process task relation log
+ */
+ List<ProcessTaskRelationLog>
queryByProcessCodeAndVersion(@Param("processCode") long processCode,
+
@Param("processVersion") int processVersion);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 1612fec..5e5fd31 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -21,6 +21,7 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.ibatis.annotations.Param;
+import java.util.Collection;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@@ -31,20 +32,38 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelation> {
/**
- * process task relation by processDefinitionCode
+ * process task relation by projectCode and processCode
*
- * @param processDefinitionCode processDefinitionCode
+ * @param projectCode projectCode
+ * @param processCode processCode
+ * @return ProcessTaskRelation list
+ */
+ List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") Long
projectCode,
+ @Param("processCode") Long
processCode);
+
+ /**
+ * process task relation by taskCode
+ *
+ * @param taskCodes taskCode list
* @return ProcessTaskRelation
*/
- List<ProcessTaskRelation>
queryByProcessDefinitionCode(@Param("processDefinitionCode") String
processDefinitionCode);
+ List<ProcessTaskRelation> queryByTaskCodes(@Param("taskCodes")
Collection<Long> taskCodes);
/**
* process task relation by taskCode
*
- * @param preTaskCode preTaskCode
- * @param postTaskCode postTaskCode
+ * @param taskCode taskCode
* @return ProcessTaskRelation
*/
- List<ProcessTaskRelation> queryByTaskCode(@Param("preTaskCode") Long
preTaskCode,
- @Param("postTaskCode") Long
postTaskCode);
+ List<ProcessTaskRelation> queryByTaskCode(@Param("taskCode") Long
taskCode);
+
+ /**
+ * delete process task relation by processCode
+ *
+ * @param projectCode projectCode
+ * @param processCode processCode
+ * @return int
+ */
+ int deleteByCode(@Param("projectCode") Long projectCode,
+ @Param("processCode") Long processCode);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
index 94990d4..e719af6 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
@@ -18,5 +18,15 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper">
-
+ <sql id="baseSql">
+ id, `name`, process_definition_version, project_code,
process_definition_code, pre_task_code, post_task_code,
+ condition_type, condition_params, operator, operate_time, create_time,
update_time
+ </sql>
+ <select id="queryByProcessCodeAndVersion"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation_log
+ WHERE process_definition_code = #{processCode}
+ and process_definition_version = #{processVersion}
+ </select>
</mapper>
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
index 65c6906..8249db5 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
@@ -19,22 +19,45 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper">
<sql id="baseSql">
- id, `name`, version, project_code, process_definition_code,
pre_task_code, post_task_code,
+ id, `name`, process_definition_version, project_code,
process_definition_code, pre_task_code, post_task_code,
condition_type, condition_params, create_time, update_time
</sql>
- <select id="queryByProcessDefinitionCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ <select id="queryByProcessCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
- WHERE process_definition_code = #{processDefinitionCode}
+ WHERE project_code = #{projectCode}
+ and process_definition_code = #{processCode}
</select>
<select id="queryByTaskCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
- WHERE pre_task_code = #{preTaskCode}
+ WHERE pre_task_code = #{taskCode}
<if test="postTaskCode != 0">
- or post_task_code = #{postTaskCode}
+ or post_task_code = #{taskCode}
</if>
</select>
+ <select id="queryByTaskCodes"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
+ select
+ <include refid="baseSql"/>
+ from t_ds_process_task_relation
+ WHERE 1 = 1
+ <if test="taskCodes != null and taskCodes.length != 0">
+ (and pre_task_code in
+ <foreach collection="taskCodes" index="index" item="i" open="("
separator="," close=")">
+ #{i}
+ </foreach>
+ or post_task_code in
+ <foreach collection="taskCodes" index="index" item="i" open="("
separator="," close=")">
+ #{i}
+ </foreach>
+ )
+ </if>
+ </select>
+ <delete id="deleteByCode">
+ delete from t_ds_process_task_relation
+ WHERE project_code = #{projectCode}
+ and process_definition_code = #{processCode}
+ </delete>
</mapper>
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 4acf6fe..b6a6026 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -31,6 +31,7 @@ import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -43,6 +44,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -52,6 +54,8 @@ import
org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import
org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@@ -63,6 +67,8 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource;
@@ -80,6 +86,8 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@@ -89,6 +97,7 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -139,8 +148,6 @@ public class ProcessService {
@Autowired
private ProcessDefinitionLogMapper processDefineLogMapper;
-
-
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@@ -181,7 +188,10 @@ public class ProcessService {
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
- private ProcessDefinitionLogMapper processDefinitionLogMapper;
+ private ProcessTaskRelationMapper processTaskRelationMapper;
+
+ @Autowired
+ private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
@@ -381,8 +391,6 @@ public class ProcessService {
/**
* covert log to process definition
- * @param processDefinitionLog
- * @return
*/
public ProcessDefinition convertFromLog(ProcessDefinitionLog
processDefinitionLog) {
ProcessDefinition definition = null;
@@ -743,10 +751,10 @@ public class ProcessService {
processInstance =
this.findProcessInstanceDetailById(processInstanceId);
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ getCommandTypeIfComplement(processInstance, command),
+ processInstance.getScheduleTime()));
}
processDefinition =
processDefineMapper.selectById(processInstance.getProcessDefinitionId());
processInstance.setProcessDefinition(processDefinition);
@@ -1249,14 +1257,12 @@ public class ProcessService {
*/
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance,
ExecutionStatus processInstanceState) {
ExecutionStatus state = taskInstance.getState();
- if (
- // running, delayed or killed
- // the task already exists in task queue
- // return state
- state == ExecutionStatus.RUNNING_EXECUTION
- || state == ExecutionStatus.DELAY_EXECUTION
- || state == ExecutionStatus.KILL
- ) {
+ // running, delayed or killed
+ // the task already exists in task queue
+ // return state
+ if (state == ExecutionStatus.RUNNING_EXECUTION
+ || state == ExecutionStatus.DELAY_EXECUTION
+ || state == ExecutionStatus.KILL) {
return state;
}
//return pasue /stop if process instance state is ready pause / stop
@@ -2058,8 +2064,6 @@ public class ProcessService {
/**
* solve the branch rename bug
*
- * @param processData
- * @param oldJson
* @return String
*/
public String changeJson(ProcessData processData, String oldJson) {
@@ -2114,10 +2118,6 @@ public class ProcessService {
/**
* switch process definition version to process definition log version
- *
- * @param processDefinition
- * @param processDefinitionLog
- * @return
*/
public int switchVersion(ProcessDefinition processDefinition,
ProcessDefinitionLog processDefinitionLog) {
if (null == processDefinition || null == processDefinitionLog) {
@@ -2132,23 +2132,29 @@ public class ProcessService {
int switchResult = 0;
if (0 == processDefinition.getId()) {
-
switchResult = processDefineMapper.insert(tmpDefinition);
} else {
switchResult = processDefineMapper.updateById(tmpDefinition);
}
- //TODO... switch task relations
+ switchProcessTaskRelationVersion(processDefinition);
return switchResult;
}
+ public void switchProcessTaskRelationVersion(ProcessDefinition
processDefinition) {
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+ if (!processTaskRelationList.isEmpty()) {
+
processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+ }
+ List<ProcessTaskRelationLog> processTaskRelationLogList =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(),
processDefinition.getVersion());
+ for (ProcessTaskRelationLog processTaskRelationLog :
processTaskRelationLogList) {
+ ProcessTaskRelation processTaskRelation =
JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
+ ProcessTaskRelation.class);
+ processTaskRelationMapper.insert(processTaskRelation);
+ }
+ }
+
/**
* update task definition
- *
- * @param operator
- * @param projectCode
- * @param taskNode
- * @param taskDefinition
- * @return
*/
public int updateTaskDefinition(User operator, Long projectCode, TaskNode
taskNode, TaskDefinition taskDefinition) {
@@ -2159,12 +2165,24 @@ public class ProcessService {
.max((x, y) -> x > y ? x : y)
.orElse(0) + 1;
Date now = new Date();
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setUserId(operator.getId());
taskDefinition.setVersion(version);
- taskDefinition.setCode(taskDefinition.getCode());
+ taskDefinition.setUpdateTime(now);
+ setTaskFromTaskNode(taskNode, taskDefinition);
+ int update = taskDefinitionMapper.updateById(taskDefinition);
+ // save task definition log
+ TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+ taskDefinitionLog.set(taskDefinition);
+ taskDefinitionLog.setOperator(operator.getId());
+ taskDefinitionLog.setOperateTime(now);
+ int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
+ return insert & update;
+ }
+
+ private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition
taskDefinition) {
taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc());
- taskDefinition.setProjectCode(projectCode);
- taskDefinition.setUserId(operator.getId());
taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
taskDefinition.setTaskParams(taskNode.getParams());
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
@@ -2175,16 +2193,7 @@ public class ProcessService {
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ?
TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
- taskDefinition.setUpdateTime(now);
taskDefinition.setResourceIds(getResourceIds(taskDefinition));
- int update = taskDefinitionMapper.updateById(taskDefinition);
- // save task definition log
- TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
- taskDefinitionLog.set(taskDefinition);
- taskDefinitionLog.setOperator(operator.getId());
- taskDefinitionLog.setOperateTime(now);
- int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
- return insert & update;
}
/**
@@ -2212,15 +2221,7 @@ public class ProcessService {
}
/**
- * @param operator
- * @param name
- * @param desc
- * @param locations
- * @param connects
- * @param project
- * @param processData
- * @param processDefinition
- * @return
+ *
*/
public int saveProcessDefinition(User operator, Project project, String
name, String desc, String locations,
String connects, ProcessData processData,
ProcessDefinition processDefinition) {
@@ -2230,28 +2231,20 @@ public class ProcessService {
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByDefinitionName(project.getCode(), task.getName());
updateTaskDefinition(operator, project.getCode(), task,
taskDefinition);
}
- createTaskAndRelation(operator, project.getName(), "",
processDefinition, processData);
+ createTaskAndRelation(operator, project.getCode(), processDefinition,
processData);
ProcessDefinitionLog processDefinitionLog =
insertProcessDefinitionLog(operator, processDefinition.getCode(),
name, processData, project, desc, locations, connects);
return switchVersion(processDefinition, processDefinitionLog);
}
/**
- * @param operator
- * @param processDefinitionCode
- * @param processDefinitionName
- * @param processData
- * @param project
- * @param desc
- * @param locations
- * @param connects
- * @return
+ *
*/
public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long
processDefinitionCode, String processDefinitionName,
ProcessData
processData, Project project,
String desc, String
locations, String connects) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
- int version =
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
+ int version =
processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
processDefinitionLog.setCode(processDefinitionCode);
processDefinitionLog.setVersion(version);
processDefinitionLog.setName(processDefinitionName);
@@ -2275,7 +2268,7 @@ public class ProcessService {
}
processDefinitionLog.setGlobalParamList(globalParamsList);
processDefinitionLog.setFlag(Flag.YES);
- int insert = processDefinitionLogMapper.insert(processDefinitionLog);
+ int insert = processDefineLogMapper.insert(processDefinitionLog);
if (insert > 0) {
return processDefinitionLog;
}
@@ -2283,24 +2276,96 @@ public class ProcessService {
}
/**
- * create task defintion and task relations
- *
- * @param loginUser
- * @param projectName
- * @param relationName
- * @param processDefinition
- * @param processData
- * @return
+ * create task definition and task relations
*/
- public void createTaskAndRelation(User loginUser, String projectName,
String relationName,
- ProcessDefinition processDefinition,
- ProcessData processData) {
+ public int createTaskAndRelation(User operator,
+ Long projectCode,
+ ProcessDefinition processDefinition,
+ ProcessData processData) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new
ArrayList<>() : processData.getTasks();
- for (TaskNode task : taskNodeList) {
- //TODO... task code exists, update task
- //createTaskDefinition(loginUser, projectName,
JSONUtils.toJsonString(task));
+ for (TaskNode taskNode : taskNodeList) {
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName());
+ if (taskDefinition == null) {
+ long code;
+ try {
+ code = SnowFlakeUtils.getInstance().nextId();
+ taskDefinition = new TaskDefinition();
+ taskDefinition.setCode(code);
+ } catch (SnowFlakeException e) {
+ logger.error("Task code get error, ", e);
+ return -1;
+ }
+ saveTaskDefinition(operator, projectCode, taskNode,
taskDefinition);
+ } else {
+ if (isTaskOnline(taskDefinition.getCode())) {
+ // TODO return something for fail
+ return -1;
+ }
+ updateTaskDefinition(operator, projectCode, taskNode,
taskDefinition);
+ }
+ }
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinition.getCode());
+ if (!processTaskRelationList.isEmpty()) {
+ processTaskRelationMapper.deleteByCode(projectCode,
processDefinition.getCode());
}
- // TODO: query taskCode by projectCode and taskName
+ // TODO parse taskNodeList for preTaskCode and postTaskCode
+ List<TaskNodeRelation> taskNodeRelationList =
DagHelper.getProcessDag(taskNodeList).getEdges();
+ Date now = new Date();
+ ProcessTaskRelation processTaskRelation = new
ProcessTaskRelation("",// todo relation name
+ processDefinition.getVersion(),
+ projectCode,
+ processDefinition.getCode(),
+ 0L, // todo pre task code
+ 0L, // todo post task code
+ ConditionType.of(""), // todo conditionType
+ "", // todo conditionParams
+ now,
+ now);
+ // save process task relation
+ int insert = processTaskRelationMapper.insert(processTaskRelation);
+ // save process task relation log
+ ProcessTaskRelationLog processTaskRelationLog = new
ProcessTaskRelationLog();
+ processTaskRelationLog.set(processTaskRelation);
+ processTaskRelationLog.setOperator(operator.getId());
+ processTaskRelationLog.setOperateTime(now);
+ int logInsert =
processTaskRelationLogMapper.insert(processTaskRelationLog);
+ return insert & logInsert;
+ }
+
+ public int saveTaskDefinition(User operator, Long projectCode, TaskNode
taskNode, TaskDefinition taskDefinition) {
+ Date now = new Date();
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setUserId(operator.getId());
+ taskDefinition.setVersion(1);
+ taskDefinition.setUpdateTime(now);
+ taskDefinition.setCreateTime(now);
+ setTaskFromTaskNode(taskNode, taskDefinition);
+ // save the new task definition
+ int insert = taskDefinitionMapper.insert(taskDefinition);
+ // save task definition log
+ TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+ taskDefinitionLog.set(taskDefinition);
+ taskDefinitionLog.setOperator(operator.getId());
+ taskDefinitionLog.setOperateTime(now);
+ int logInsert = taskDefinitionLogMapper.insert(taskDefinitionLog);
+ return insert & logInsert;
}
+ public boolean isTaskOnline(Long taskCode) {
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByTaskCode(taskCode);
+ if (!processTaskRelationList.isEmpty()) {
+ Set<Long> processDefinitionCodes = processTaskRelationList
+ .stream()
+ .map(ProcessTaskRelation::getProcessDefinitionCode)
+ .collect(Collectors.toSet());
+ List<ProcessDefinition> processDefinitionList =
processDefineMapper.queryByCodes(processDefinitionCodes);
+ // check process definition is already online
+ for (ProcessDefinition processDefinition : processDefinitionList) {
+ if (processDefinition.getReleaseState() ==
ReleaseState.ONLINE) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql
index d1439f6..6f5088c 100644
--- a/sql/dolphinscheduler-postgre.sql
+++ b/sql/dolphinscheduler-postgre.sql
@@ -394,12 +394,10 @@ DROP TABLE IF EXISTS t_ds_process_task_relation;
CREATE TABLE t_ds_process_task_relation (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
- version int DEFAULT NULL ,
+ process_definition_version int DEFAULT NULL ,
project_code bigint DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL ,
- pre_project_code bigint DEFAULT NULL ,
pre_task_code bigint DEFAULT NULL ,
- post_project_code bigint DEFAULT NULL ,
post_task_code bigint DEFAULT NULL ,
condition_type int DEFAULT NULL ,
condition_params text ,
@@ -412,12 +410,10 @@ DROP TABLE IF EXISTS t_ds_process_task_relation_log;
CREATE TABLE t_ds_process_task_relation_log (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
- version int DEFAULT NULL ,
+ process_definition_version int DEFAULT NULL ,
project_code bigint DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL ,
- pre_project_code bigint DEFAULT NULL ,
pre_task_code bigint DEFAULT NULL ,
- post_project_code bigint DEFAULT NULL ,
post_task_code bigint DEFAULT NULL ,
condition_type int DEFAULT NULL ,
condition_params text ,
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index a9d4752..640b55a 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -509,7 +509,7 @@ DROP TABLE IF EXISTS `t_ds_process_task_relation`;
CREATE TABLE `t_ds_process_task_relation` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`name` varchar(200) DEFAULT NULL COMMENT 'relation name',
- `version` int(11) DEFAULT NULL COMMENT 'relation version',
+ `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
@@ -528,7 +528,7 @@ DROP TABLE IF EXISTS `t_ds_process_task_relation_log`;
CREATE TABLE `t_ds_process_task_relation_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`name` varchar(200) DEFAULT NULL COMMENT 'relation name',
- `version` int(11) DEFAULT NULL COMMENT 'relation version',
+ `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',