This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1e99732  add processDefinition releaseWorkflowAndSchedule (#7044)
1e99732 is described below

commit 1e9973299fbb7109959941951f3695667bf7e8c8
Author: JinYong Li <[email protected]>
AuthorDate: Mon Nov 29 20:13:49 2021 +0800

    add processDefinition releaseWorkflowAndSchedule (#7044)
---
 .../controller/ProcessDefinitionController.java    |  24 -----
 .../api/service/ProcessDefinitionService.java      |  12 ---
 .../service/impl/ProcessDefinitionServiceImpl.java | 107 ++++++++++++++++-----
 3 files changed, 82 insertions(+), 61 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index ec7c5f9..d6c1849 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -813,28 +813,4 @@ public class ProcessDefinitionController extends 
BaseController {
                                              @RequestParam(value = 
"releaseState", required = true, defaultValue = "OFFLINE") ReleaseState 
releaseState) {
         return 
returnDataList(processDefinitionService.releaseWorkflowAndSchedule(loginUser, 
projectCode, code, releaseState));
     }
-
-    /**
-     * delete process definition and schedule
-     *
-     * @param loginUser login user
-     * @param projectCode project code
-     * @param code process definition code
-     * @return update result code
-     */
-    @ApiOperation(value = "deleteWorkflowAndSchedule", notes = 
"DELETE_WORKFLOW_SCHEDULE_NOTES")
-    @ApiImplicitParams({
-        @ApiImplicitParam(name = "projectCode", value = 
"PROCESS_DEFINITION_NAME", required = true, type = "Long"),
-        @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", 
required = true, dataType = "Long", example = "123456789")
-    })
-    @DeleteMapping(value = "/{code}/delete-workflow")
-    @ResponseStatus(HttpStatus.OK)
-    @ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR)
-    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
-    public Result deleteWorkflowAndSchedule(@ApiIgnore @RequestAttribute(value 
= Constants.SESSION_USER) User loginUser,
-                                            @ApiParam(name = "projectCode", 
value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
-                                            @PathVariable(value = "code", 
required = true) long code) {
-        return 
returnDataList(processDefinitionService.deleteWorkflowAndSchedule(loginUser, 
projectCode, code));
-    }
-
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index e1e0503..2f1c3f7 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -397,17 +397,5 @@ public interface ProcessDefinitionService {
                                                    long projectCode,
                                                    long code,
                                                    ReleaseState releaseState);
-
-    /**
-     * delete process definition and schedule
-     *
-     * @param loginUser login user
-     * @param projectCode project code
-     * @param code process definition code
-     * @return update result code
-     */
-    Map<String, Object> deleteWorkflowAndSchedule(User loginUser,
-                                                  long projectCode,
-                                                  long code);
 }
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 2a3c14d..f472150 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -75,6 +75,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.BufferedOutputStream;
 import java.io.IOException;
@@ -719,23 +720,28 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
 
         ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(code);
-
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
+            return result;
+        }
         switch (releaseState) {
             case ONLINE:
+                List<ProcessTaskRelation> relationList = 
processService.findRelationByCode(projectCode, code);
+                if (CollectionUtils.isEmpty(relationList)) {
+                    putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
+                    return result;
+                }
                 processDefinition.setReleaseState(releaseState);
                 processDefinitionMapper.updateById(processDefinition);
                 break;
             case OFFLINE:
                 processDefinition.setReleaseState(releaseState);
                 int updateProcess = 
processDefinitionMapper.updateById(processDefinition);
-                List<Schedule> scheduleList = 
scheduleMapper.selectAllByProcessDefineArray(
-                        new long[]{processDefinition.getCode()}
-                );
-                if (updateProcess > 0 && scheduleList.size() == 1) {
-                    Schedule schedule = scheduleList.get(0);
-                    logger.info("set schedule offline, project id: {}, 
schedule id: {}, process definition code: {}", project.getId(), 
schedule.getId(), code);
+                Schedule schedule = 
scheduleMapper.queryByProcessDefinitionCode(code);
+                if (updateProcess > 0 && schedule != null) {
+                    logger.info("set schedule offline, project code: {}, 
schedule id: {}, process definition code: {}", projectCode, schedule.getId(), 
code);
                     // set status
-                    schedule.setReleaseState(ReleaseState.OFFLINE);
+                    schedule.setReleaseState(releaseState);
                     int updateSchedule = scheduleMapper.updateById(schedule);
                     if (updateSchedule == 0) {
                         putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
@@ -1585,7 +1591,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
-        if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+        if (StringUtils.isBlank(scheduleJson)) {
             return result;
         }
 
@@ -1613,14 +1619,19 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                                                   long projectCode,
                                                   long processDefinitionCode,
                                                   String scheduleJson) {
+        Map<String, Object> result = new HashMap<>();
         Schedule schedule = JSONUtils.parseObject(scheduleJson, 
Schedule.class);
+        if (schedule == null) {
+            putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
+            throw new ServiceException(Status.DATA_IS_NOT_VALID);
+        }
         // set default value
         FailureStrategy failureStrategy = schedule.getFailureStrategy() == 
null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
         WarningType warningType = schedule.getWarningType() == null ? 
WarningType.NONE : schedule.getWarningType();
         Priority processInstancePriority = 
schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : 
schedule.getProcessInstancePriority();
         int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : 
schedule.getWarningGroupId();
         String workerGroup = schedule.getWorkerGroup() == null ? "default" : 
schedule.getWorkerGroup();
-        Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : 
schedule.getEnvironmentCode();
+        long environmentCode = schedule.getEnvironmentCode() == null ? -1 : 
schedule.getEnvironmentCode();
 
         ScheduleParam param = new ScheduleParam();
         param.setStartTime(schedule.getStartTime());
@@ -1711,7 +1722,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             return result;
         }
 
-        if (scheduleJson == null || scheduleJson.trim().isEmpty()) {
+        if (StringUtils.isBlank(scheduleJson)) {
             return result;
         }
         // update dag schedule
@@ -1746,14 +1757,19 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                                                   long projectCode,
                                                   long processDefinitionCode,
                                                   String scheduleJson) {
+        Map<String, Object> result = new HashMap<>();
         Schedule schedule = JSONUtils.parseObject(scheduleJson, 
Schedule.class);
+        if (schedule == null) {
+            putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
+            throw new ServiceException(Status.DATA_IS_NOT_VALID);
+        }
         // set default value
         FailureStrategy failureStrategy = schedule.getFailureStrategy() == 
null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
         WarningType warningType = schedule.getWarningType() == null ? 
WarningType.NONE : schedule.getWarningType();
         Priority processInstancePriority = 
schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : 
schedule.getProcessInstancePriority();
         int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : 
schedule.getWarningGroupId();
         String workerGroup = schedule.getWorkerGroup() == null ? "default" : 
schedule.getWorkerGroup();
-        Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : 
schedule.getEnvironmentCode();
+        long environmentCode = schedule.getEnvironmentCode() == null ? -1 : 
schedule.getEnvironmentCode();
 
         ScheduleParam param = new ScheduleParam();
         param.setStartTime(schedule.getStartTime());
@@ -1783,21 +1799,62 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
      * @param releaseState releaseState
      * @return update result code
      */
+    @Transactional(rollbackFor = RuntimeException.class)
     @Override
     public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long 
projectCode, long code, ReleaseState releaseState) {
-        return null;
-    }
+        Project project = projectMapper.queryByCode(projectCode);
+        //check user access for project
+        Map<String, Object> result = 
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+        if (result.get(Constants.STATUS) != Status.SUCCESS) {
+            return result;
+        }
+        // check state
+        if (null == releaseState) {
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, 
RELEASESTATE);
+            return result;
+        }
 
-    /**
-     * delete process definition and schedule
-     *
-     * @param loginUser login user
-     * @param projectCode project code
-     * @param code process definition code
-     * @return update result code
-     */
-    @Override
-    public Map<String, Object> deleteWorkflowAndSchedule(User loginUser, long 
projectCode, long code) {
-        return null;
+        ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(code);
+        if (processDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
+            return result;
+        }
+        Schedule scheduleObj = 
scheduleMapper.queryByProcessDefinitionCode(code);
+        if (scheduleObj == null) {
+            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, 
"processDefinitionCode:" + code);
+            return result;
+        }
+        switch (releaseState) {
+            case ONLINE:
+                List<ProcessTaskRelation> relationList = 
processService.findRelationByCode(projectCode, code);
+                if (CollectionUtils.isEmpty(relationList)) {
+                    putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
+                    return result;
+                }
+                processDefinition.setReleaseState(releaseState);
+                processDefinitionMapper.updateById(processDefinition);
+                scheduleMapper.updateById(scheduleObj);
+                break;
+            case OFFLINE:
+                processDefinition.setReleaseState(releaseState);
+                int updateProcess = 
processDefinitionMapper.updateById(processDefinition);
+                if (updateProcess > 0) {
+                    logger.info("set schedule offline, project code: {}, 
schedule id: {}, process definition code: {}", projectCode, 
scheduleObj.getId(), code);
+                    // set status
+                    scheduleObj.setReleaseState(ReleaseState.OFFLINE);
+                    int updateSchedule = 
scheduleMapper.updateById(scheduleObj);
+                    if (updateSchedule == 0) {
+                        putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
+                        throw new 
ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
+                    }
+                    schedulerService.deleteSchedule(project.getId(), 
scheduleObj.getId());
+                }
+                break;
+            default:
+                putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, 
RELEASESTATE);
+                return result;
+        }
+        putMsg(result, Status.SUCCESS);
+        return result;
     }
 }

Reply via email to