This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1e99732 add processDefinition releaseWorkflowAndSchedule (#7044)
1e99732 is described below
commit 1e9973299fbb7109959941951f3695667bf7e8c8
Author: JinYong Li <[email protected]>
AuthorDate: Mon Nov 29 20:13:49 2021 +0800
add processDefinition releaseWorkflowAndSchedule (#7044)
---
.../controller/ProcessDefinitionController.java | 24 -----
.../api/service/ProcessDefinitionService.java | 12 ---
.../service/impl/ProcessDefinitionServiceImpl.java | 107 ++++++++++++++++-----
3 files changed, 82 insertions(+), 61 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index ec7c5f9..d6c1849 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -813,28 +813,4 @@ public class ProcessDefinitionController extends
BaseController {
@RequestParam(value =
"releaseState", required = true, defaultValue = "OFFLINE") ReleaseState
releaseState) {
return
returnDataList(processDefinitionService.releaseWorkflowAndSchedule(loginUser,
projectCode, code, releaseState));
}
-
- /**
- * delete process definition and schedule
- *
- * @param loginUser login user
- * @param projectCode project code
- * @param code process definition code
- * @return update result code
- */
- @ApiOperation(value = "deleteWorkflowAndSchedule", notes =
"DELETE_WORKFLOW_SCHEDULE_NOTES")
- @ApiImplicitParams({
- @ApiImplicitParam(name = "projectCode", value =
"PROCESS_DEFINITION_NAME", required = true, type = "Long"),
- @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE",
required = true, dataType = "Long", example = "123456789")
- })
- @DeleteMapping(value = "/{code}/delete-workflow")
- @ResponseStatus(HttpStatus.OK)
- @ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR)
- @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
- public Result deleteWorkflowAndSchedule(@ApiIgnore @RequestAttribute(value
= Constants.SESSION_USER) User loginUser,
- @ApiParam(name = "projectCode",
value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
- @PathVariable(value = "code",
required = true) long code) {
- return
returnDataList(processDefinitionService.deleteWorkflowAndSchedule(loginUser,
projectCode, code));
- }
-
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index e1e0503..2f1c3f7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -397,17 +397,5 @@ public interface ProcessDefinitionService {
long projectCode,
long code,
ReleaseState releaseState);
-
- /**
- * delete process definition and schedule
- *
- * @param loginUser login user
- * @param projectCode project code
- * @param code process definition code
- * @return update result code
- */
- Map<String, Object> deleteWorkflowAndSchedule(User loginUser,
- long projectCode,
- long code);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 2a3c14d..f472150 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -75,6 +75,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import java.io.BufferedOutputStream;
import java.io.IOException;
@@ -719,23 +720,28 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
-
+ if (processDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
+ return result;
+ }
switch (releaseState) {
case ONLINE:
+ List<ProcessTaskRelation> relationList =
processService.findRelationByCode(projectCode, code);
+ if (CollectionUtils.isEmpty(relationList)) {
+ putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
+ return result;
+ }
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess =
processDefinitionMapper.updateById(processDefinition);
- List<Schedule> scheduleList =
scheduleMapper.selectAllByProcessDefineArray(
- new long[]{processDefinition.getCode()}
- );
- if (updateProcess > 0 && scheduleList.size() == 1) {
- Schedule schedule = scheduleList.get(0);
- logger.info("set schedule offline, project id: {},
schedule id: {}, process definition code: {}", project.getId(),
schedule.getId(), code);
+ Schedule schedule =
scheduleMapper.queryByProcessDefinitionCode(code);
+ if (updateProcess > 0 && schedule != null) {
+ logger.info("set schedule offline, project code: {},
schedule id: {}, process definition code: {}", projectCode, schedule.getId(),
code);
// set status
- schedule.setReleaseState(ReleaseState.OFFLINE);
+ schedule.setReleaseState(releaseState);
int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) {
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
@@ -1585,7 +1591,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+ if (StringUtils.isBlank(scheduleJson)) {
return result;
}
@@ -1613,14 +1619,19 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
long projectCode,
long processDefinitionCode,
String scheduleJson) {
+ Map<String, Object> result = new HashMap<>();
Schedule schedule = JSONUtils.parseObject(scheduleJson,
Schedule.class);
+ if (schedule == null) {
+ putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
+ throw new ServiceException(Status.DATA_IS_NOT_VALID);
+ }
// set default value
FailureStrategy failureStrategy = schedule.getFailureStrategy() ==
null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
WarningType warningType = schedule.getWarningType() == null ?
WarningType.NONE : schedule.getWarningType();
Priority processInstancePriority =
schedule.getProcessInstancePriority() == null ? Priority.MEDIUM :
schedule.getProcessInstancePriority();
int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 :
schedule.getWarningGroupId();
String workerGroup = schedule.getWorkerGroup() == null ? "default" :
schedule.getWorkerGroup();
- Long environmentCode = schedule.getEnvironmentCode() == null ? -1 :
schedule.getEnvironmentCode();
+ long environmentCode = schedule.getEnvironmentCode() == null ? -1 :
schedule.getEnvironmentCode();
ScheduleParam param = new ScheduleParam();
param.setStartTime(schedule.getStartTime());
@@ -1711,7 +1722,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
- if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+ if (StringUtils.isBlank(scheduleJson)) {
return result;
}
// update dag schedule
@@ -1746,14 +1757,19 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
long projectCode,
long processDefinitionCode,
String scheduleJson) {
+ Map<String, Object> result = new HashMap<>();
Schedule schedule = JSONUtils.parseObject(scheduleJson,
Schedule.class);
+ if (schedule == null) {
+ putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
+ throw new ServiceException(Status.DATA_IS_NOT_VALID);
+ }
// set default value
FailureStrategy failureStrategy = schedule.getFailureStrategy() ==
null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
WarningType warningType = schedule.getWarningType() == null ?
WarningType.NONE : schedule.getWarningType();
Priority processInstancePriority =
schedule.getProcessInstancePriority() == null ? Priority.MEDIUM :
schedule.getProcessInstancePriority();
int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 :
schedule.getWarningGroupId();
String workerGroup = schedule.getWorkerGroup() == null ? "default" :
schedule.getWorkerGroup();
- Long environmentCode = schedule.getEnvironmentCode() == null ? -1 :
schedule.getEnvironmentCode();
+ long environmentCode = schedule.getEnvironmentCode() == null ? -1 :
schedule.getEnvironmentCode();
ScheduleParam param = new ScheduleParam();
param.setStartTime(schedule.getStartTime());
@@ -1783,21 +1799,62 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
* @param releaseState releaseState
* @return update result code
*/
+ @Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long
projectCode, long code, ReleaseState releaseState) {
- return null;
- }
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ // check state
+ if (null == releaseState) {
+ putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
RELEASESTATE);
+ return result;
+ }
- /**
- * delete process definition and schedule
- *
- * @param loginUser login user
- * @param projectCode project code
- * @param code process definition code
- * @return update result code
- */
- @Override
- public Map<String, Object> deleteWorkflowAndSchedule(User loginUser, long
projectCode, long code) {
- return null;
+ ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
+ if (processDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
+ return result;
+ }
+ Schedule scheduleObj =
scheduleMapper.queryByProcessDefinitionCode(code);
+ if (scheduleObj == null) {
+ putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS,
"processDefinitionCode:" + code);
+ return result;
+ }
+ switch (releaseState) {
+ case ONLINE:
+ List<ProcessTaskRelation> relationList =
processService.findRelationByCode(projectCode, code);
+ if (CollectionUtils.isEmpty(relationList)) {
+ putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
+ return result;
+ }
+ processDefinition.setReleaseState(releaseState);
+ processDefinitionMapper.updateById(processDefinition);
+ scheduleMapper.updateById(scheduleObj);
+ break;
+ case OFFLINE:
+ processDefinition.setReleaseState(releaseState);
+ int updateProcess =
processDefinitionMapper.updateById(processDefinition);
+ if (updateProcess > 0) {
+ logger.info("set schedule offline, project code: {},
schedule id: {}, process definition code: {}", projectCode,
scheduleObj.getId(), code);
+ // set status
+ scheduleObj.setReleaseState(ReleaseState.OFFLINE);
+ int updateSchedule =
scheduleMapper.updateById(scheduleObj);
+ if (updateSchedule == 0) {
+ putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
+ throw new
ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
+ }
+ schedulerService.deleteSchedule(project.getId(),
scheduleObj.getId());
+ }
+ break;
+ default:
+ putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
RELEASESTATE);
+ return result;
+ }
+ putMsg(result, Status.SUCCESS);
+ return result;
}
}