This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 88cd37f [Improvement][API][num-1] save schedule when workflow is
offline in interface 'createEmptyProcessDefinition' (#7047)
88cd37f is described below
commit 88cd37ff0fd09c3fa15b9f3a98d181fd90abe1bb
Author: EdwardYang <[email protected]>
AuthorDate: Mon Nov 29 21:32:23 2021 +0800
[Improvement][API][num-1] save schedule when workflow is offline in
interface 'createEmptyProcessDefinition' (#7047)
* [Improvement] save schedule when workflow is offline
* [Improvement] rollback transactional when creating/updating schedule
failed.
* [Improvement] resolve merge conflict
* [Improvement] do not set projectName and processDefinitionName when
saving schedule
* [Improvement] do not set projectName and processDefinitionName when
saving schedule
Co-authored-by: edward-yang <[email protected]>
---
.../service/impl/ProcessDefinitionServiceImpl.java | 75 +++++++++++++---------
1 file changed, 44 insertions(+), 31 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index f472150..abe0706 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -1596,9 +1596,11 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
// save dag schedule
- Map<String, Object> scheduleResult = createDagSchedule(loginUser,
projectCode, processDefinitionCode, scheduleJson);
+ Map<String, Object> scheduleResult = createDagSchedule(loginUser,
project, processDefinition, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
- return scheduleResult;
+ Status scheduleResultStatus = (Status)
scheduleResult.get(Constants.STATUS);
+ putMsg(result, scheduleResultStatus);
+ throw new ServiceException(scheduleResultStatus);
}
return result;
}
@@ -1616,40 +1618,48 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
private Map<String, Object> createDagSchedule(User loginUser,
- long projectCode,
- long processDefinitionCode,
+ Project project,
+ ProcessDefinition
processDefinition,
String scheduleJson) {
Map<String, Object> result = new HashMap<>();
- Schedule schedule = JSONUtils.parseObject(scheduleJson,
Schedule.class);
- if (schedule == null) {
+ Schedule scheduleObj = JSONUtils.parseObject(scheduleJson,
Schedule.class);
+ if (scheduleObj == null) {
putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
- // set default value
- FailureStrategy failureStrategy = schedule.getFailureStrategy() ==
null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
- WarningType warningType = schedule.getWarningType() == null ?
WarningType.NONE : schedule.getWarningType();
- Priority processInstancePriority =
schedule.getProcessInstancePriority() == null ? Priority.MEDIUM :
schedule.getProcessInstancePriority();
- int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 :
schedule.getWarningGroupId();
- String workerGroup = schedule.getWorkerGroup() == null ? "default" :
schedule.getWorkerGroup();
- long environmentCode = schedule.getEnvironmentCode() == null ? -1 :
schedule.getEnvironmentCode();
-
- ScheduleParam param = new ScheduleParam();
- param.setStartTime(schedule.getStartTime());
- param.setEndTime(schedule.getEndTime());
- param.setCrontab(schedule.getCrontab());
- param.setTimezoneId(schedule.getTimezoneId());
+ Date now = new Date();
+ scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
+ if (DateUtils.differSec(scheduleObj.getStartTime(),
scheduleObj.getEndTime()) == 0) {
+ logger.warn("The start time must not be the same as the end");
+ putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
+ return result;
+ }
+ if
(!org.quartz.CronExpression.isValidExpression(scheduleObj.getCrontab())) {
+ logger.error("{} verify failure", scheduleObj.getCrontab());
+ putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
scheduleObj.getCrontab());
+ return result;
+ }
+ scheduleObj.setWarningType(scheduleObj.getWarningType() == null ?
WarningType.NONE : scheduleObj.getWarningType());
+ scheduleObj.setWarningGroupId(scheduleObj.getWarningGroupId() == 0 ? 1
: scheduleObj.getWarningGroupId());
+ scheduleObj.setFailureStrategy(scheduleObj.getFailureStrategy() ==
null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy());
+ scheduleObj.setCreateTime(now);
+ scheduleObj.setUpdateTime(now);
+ scheduleObj.setUserId(loginUser.getId());
+ scheduleObj.setReleaseState(ReleaseState.OFFLINE);
+
scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority()
== null ? Priority.MEDIUM : scheduleObj.getProcessInstancePriority());
+ scheduleObj.setWorkerGroup(scheduleObj.getWorkerGroup() == null ?
"default" : scheduleObj.getWorkerGroup());
+ scheduleObj.setEnvironmentCode(scheduleObj.getEnvironmentCode() ==
null ? -1 : scheduleObj.getEnvironmentCode());
+ scheduleMapper.insert(scheduleObj);
+
+ /**
+ * updateProcessInstance receivers and cc by process definition id
+ */
+ processDefinition.setWarningGroupId(scheduleObj.getWarningGroupId());
+ processDefinitionMapper.updateById(processDefinition);
- return schedulerService.insertSchedule(
- loginUser,
- projectCode,
- processDefinitionCode,
- JSONUtils.toJsonString(param),
- warningType,
- warningGroupId,
- failureStrategy,
- processInstancePriority,
- workerGroup,
- environmentCode);
+ putMsg(result, Status.SUCCESS);
+ result.put("scheduleId", scheduleObj.getId());
+ return result;
}
/**
@@ -1668,6 +1678,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
* @return update result code
*/
@Override
+ @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> updateProcessDefinitionBasicInfo(User loginUser,
long
projectCode,
String name,
@@ -1728,7 +1739,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
// update dag schedule
Map<String, Object> scheduleResult = updateDagSchedule(loginUser,
projectCode, code, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
- return scheduleResult;
+ Status scheduleResultStatus = (Status)
scheduleResult.get(Constants.STATUS);
+ putMsg(result, scheduleResultStatus);
+ throw new ServiceException(scheduleResultStatus);
}
return result;
}