This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 88cd37f  [Improvement][API][num-1] save schedule when workflow is 
offline in interface 'createEmptyProcessDefinition' (#7047)
88cd37f is described below

commit 88cd37ff0fd09c3fa15b9f3a98d181fd90abe1bb
Author: EdwardYang <[email protected]>
AuthorDate: Mon Nov 29 21:32:23 2021 +0800

    [Improvement][API][num-1] save schedule when workflow is offline in 
interface 'createEmptyProcessDefinition' (#7047)
    
    * [Improvement] save schedule when workflow is offline
    
    * [Improvement] rollback transactional when creating/updating schedule 
failed.
    
    * [Improvement] resolve merge conflict
    
    * [Improvement] do not set projectName and processDefinitionName when 
saving schedule
    
    * [Improvement] do not set projectName and processDefinitionName when 
saving schedule
    
    Co-authored-by: edward-yang <[email protected]>
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 75 +++++++++++++---------
 1 file changed, 44 insertions(+), 31 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index f472150..abe0706 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -1596,9 +1596,11 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
 
         // save dag schedule
-        Map<String, Object> scheduleResult = createDagSchedule(loginUser, 
projectCode, processDefinitionCode, scheduleJson);
+        Map<String, Object> scheduleResult = createDagSchedule(loginUser, 
project, processDefinition, scheduleJson);
         if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
-            return scheduleResult;
+            Status scheduleResultStatus = (Status) 
scheduleResult.get(Constants.STATUS);
+            putMsg(result, scheduleResultStatus);
+            throw new ServiceException(scheduleResultStatus);
         }
         return result;
     }
@@ -1616,40 +1618,48 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     }
 
     private Map<String, Object> createDagSchedule(User loginUser,
-                                                  long projectCode,
-                                                  long processDefinitionCode,
+                                                  Project project,
+                                                  ProcessDefinition 
processDefinition,
                                                   String scheduleJson) {
         Map<String, Object> result = new HashMap<>();
-        Schedule schedule = JSONUtils.parseObject(scheduleJson, 
Schedule.class);
-        if (schedule == null) {
+        Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, 
Schedule.class);
+        if (scheduleObj == null) {
             putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
             throw new ServiceException(Status.DATA_IS_NOT_VALID);
         }
-        // set default value
-        FailureStrategy failureStrategy = schedule.getFailureStrategy() == 
null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
-        WarningType warningType = schedule.getWarningType() == null ? 
WarningType.NONE : schedule.getWarningType();
-        Priority processInstancePriority = 
schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : 
schedule.getProcessInstancePriority();
-        int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : 
schedule.getWarningGroupId();
-        String workerGroup = schedule.getWorkerGroup() == null ? "default" : 
schedule.getWorkerGroup();
-        long environmentCode = schedule.getEnvironmentCode() == null ? -1 : 
schedule.getEnvironmentCode();
-
-        ScheduleParam param = new ScheduleParam();
-        param.setStartTime(schedule.getStartTime());
-        param.setEndTime(schedule.getEndTime());
-        param.setCrontab(schedule.getCrontab());
-        param.setTimezoneId(schedule.getTimezoneId());
+        Date now = new Date();
+        scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
+        if (DateUtils.differSec(scheduleObj.getStartTime(), 
scheduleObj.getEndTime()) == 0) {
+            logger.warn("The start time must not be the same as the end");
+            putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
+            return result;
+        }
+        if 
(!org.quartz.CronExpression.isValidExpression(scheduleObj.getCrontab())) {
+            logger.error("{} verify failure", scheduleObj.getCrontab());
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, 
scheduleObj.getCrontab());
+            return result;
+        }
+        scheduleObj.setWarningType(scheduleObj.getWarningType() == null ? 
WarningType.NONE : scheduleObj.getWarningType());
+        scheduleObj.setWarningGroupId(scheduleObj.getWarningGroupId() == 0 ? 1 
: scheduleObj.getWarningGroupId());
+        scheduleObj.setFailureStrategy(scheduleObj.getFailureStrategy() == 
null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy());
+        scheduleObj.setCreateTime(now);
+        scheduleObj.setUpdateTime(now);
+        scheduleObj.setUserId(loginUser.getId());
+        scheduleObj.setReleaseState(ReleaseState.OFFLINE);
+        
scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority() 
== null ? Priority.MEDIUM : scheduleObj.getProcessInstancePriority());
+        scheduleObj.setWorkerGroup(scheduleObj.getWorkerGroup() == null ? 
"default" : scheduleObj.getWorkerGroup());
+        scheduleObj.setEnvironmentCode(scheduleObj.getEnvironmentCode() == 
null ? -1 : scheduleObj.getEnvironmentCode());
+        scheduleMapper.insert(scheduleObj);
+
+        /**
+         * updateProcessInstance receivers and cc by process definition id
+         */
+        processDefinition.setWarningGroupId(scheduleObj.getWarningGroupId());
+        processDefinitionMapper.updateById(processDefinition);
 
-        return schedulerService.insertSchedule(
-                loginUser,
-                projectCode,
-                processDefinitionCode,
-                JSONUtils.toJsonString(param),
-                warningType,
-                warningGroupId,
-                failureStrategy,
-                processInstancePriority,
-                workerGroup,
-                environmentCode);
+        putMsg(result, Status.SUCCESS);
+        result.put("scheduleId", scheduleObj.getId());
+        return result;
     }
 
     /**
@@ -1668,6 +1678,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
      * @return update result code
      */
     @Override
+    @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> updateProcessDefinitionBasicInfo(User loginUser,
                                                                 long 
projectCode,
                                                                 String name,
@@ -1728,7 +1739,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         // update dag schedule
         Map<String, Object> scheduleResult = updateDagSchedule(loginUser, 
projectCode, code, scheduleJson);
         if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
-            return scheduleResult;
+            Status scheduleResultStatus = (Status) 
scheduleResult.get(Constants.STATUS);
+            putMsg(result, scheduleResultStatus);
+            throw new ServiceException(scheduleResultStatus);
         }
         return result;
     }

Reply via email to