qingwli commented on code in PR #15198:
URL:
https://github.com/apache/dolphinscheduler/pull/15198#discussion_r1403832942
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2820,147 +2504,42 @@ public int saveTaskRelation(User loginUser,
ProcessDefinition processDefinition,
return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS :
Constants.EXIT_CODE_FAILURE;
}
- protected Map<String, Object> updateDagSchedule(User loginUser,
- long projectCode,
- long processDefinitionCode,
- String scheduleJson) {
- Map<String, Object> result = new HashMap<>();
- Schedule schedule = JSONUtils.parseObject(scheduleJson,
Schedule.class);
- if (schedule == null) {
- putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
- throw new ServiceException(Status.DATA_IS_NOT_VALID);
- }
- // set default value
- FailureStrategy failureStrategy =
- ObjectUtils.defaultIfNull(schedule.getFailureStrategy(),
FailureStrategy.CONTINUE);
- WarningType warningType =
ObjectUtils.defaultIfNull(schedule.getWarningType(), WarningType.NONE);
- Priority processInstancePriority =
-
ObjectUtils.defaultIfNull(schedule.getProcessInstancePriority(),
Priority.MEDIUM);
- int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 :
schedule.getWarningGroupId();
- String workerGroup =
StringUtils.defaultIfBlank(schedule.getWorkerGroup(), DEFAULT_WORKER_GROUP);
- String tenantCode =
StringUtils.defaultIfBlank(schedule.getTenantCode(), Constants.DEFAULT);
- long environmentCode = schedule.getEnvironmentCode() == null ? -1 :
schedule.getEnvironmentCode();
-
- ScheduleParam param = new ScheduleParam();
- param.setStartTime(schedule.getStartTime());
- param.setEndTime(schedule.getEndTime());
- param.setCrontab(schedule.getCrontab());
- param.setTimezoneId(schedule.getTimezoneId());
-
- return schedulerService.updateScheduleByProcessDefinitionCode(
- loginUser,
- projectCode,
- processDefinitionCode,
- JSONUtils.toJsonString(param),
- warningType,
- warningGroupId,
- failureStrategy,
- processInstancePriority,
- workerGroup,
- tenantCode,
- environmentCode);
- }
-
- /**
- * release process definition and schedule
- *
- * @param loginUser login user
- * @param projectCode project code
- * @param code process definition code
- * @param releaseState releaseState
- * @return update result code
- */
@Transactional
@Override
- public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long
projectCode, long code,
- ReleaseState
releaseState) {
- Project project = projectMapper.queryByCode(projectCode);
- // check user access for project
- Map<String, Object> result =
- projectService.checkProjectAndAuth(loginUser, project,
projectCode, WORKFLOW_ONLINE_OFFLINE);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
- // check state
- if (null == releaseState) {
- putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
RELEASESTATE);
- return result;
- }
+ public void onlineWorkflowDefinition(User loginUser, Long projectCode,
Long workflowDefinitionCode) {
+ projectService.checkProjectAndAuthThrowException(loginUser,
projectCode, WORKFLOW_ONLINE_OFFLINE);
- ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (processDefinition == null) {
- log.error("Process definition does not exist, code:{}.", code);
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(code));
- return result;
- }
- Schedule scheduleObj =
scheduleMapper.queryByProcessDefinitionCode(code);
- if (scheduleObj == null) {
- log.error("Schedule cron does not exist,
processDefinitionCode:{}.", code);
- putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS,
"processDefinitionCode:" + code);
- return result;
- }
- switch (releaseState) {
- case ONLINE:
- List<ProcessTaskRelation> relationList =
- processService.findRelationByCode(code,
processDefinition.getVersion());
- if (CollectionUtils.isEmpty(relationList)) {
- log.warn("Process definition has no task relation,
processDefinitionCode:{}.", code);
- putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
- return result;
- }
- processDefinition.setReleaseState(releaseState);
- processDefinitionMapper.updateById(processDefinition);
- schedulerService.setScheduleState(loginUser, projectCode,
scheduleObj.getId(), ReleaseState.ONLINE);
- break;
- case OFFLINE:
- processDefinition.setReleaseState(releaseState);
- int updateProcess =
processDefinitionMapper.updateById(processDefinition);
- if (updateProcess > 0) {
- log.info("Set schedule offline, projectCode:{},
processDefinitionCode:{}, scheduleId:{}.",
- projectCode, code, scheduleObj.getId());
- // set status
- scheduleObj.setReleaseState(ReleaseState.OFFLINE);
- int updateSchedule =
scheduleMapper.updateById(scheduleObj);
- if (updateSchedule == 0) {
- log.error(
- "Set schedule offline error, projectCode:{},
processDefinitionCode:{}, scheduleId:{}",
- projectCode, code, scheduleObj.getId());
- putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
- throw new
ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
- }
- schedulerService.deleteSchedule(project.getId(),
scheduleObj.getId());
- }
- break;
- default:
- putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
RELEASESTATE);
- return result;
+ ProcessDefinition workflowDefinition =
processDefinitionDao.queryByCode(workflowDefinitionCode)
+ .orElseThrow(() -> new
ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode));
+
+ if (ReleaseState.ONLINE.equals(workflowDefinition.getReleaseState())) {
+ // do nothing if the workflow is already online
+ return;
}
- putMsg(result, Status.SUCCESS);
- return result;
- }
- /**
- * save other relation
- * @param loginUser
- * @param processDefinition
- * @param result
- * @param otherParamsJson
- */
- @Override
- public void saveOtherRelation(User loginUser, ProcessDefinition
processDefinition, Map<String, Object> result,
- String otherParamsJson) {
+ checkWorkflowDefinitionIsValidated(workflowDefinition.getCode());
+ checkAllSubWorkflowDefinitionIsOnline(workflowDefinition.getCode());
+ workflowDefinition.setReleaseState(ReleaseState.ONLINE);
+ processDefinitionDao.updateById(workflowDefinition);
}
- /**
- * get Json String
- * @param loginUser
- * @param processDefinition
- * @return Json String
- */
+ @Transactional
@Override
- public String doOtherOperateProcess(User loginUser, ProcessDefinition
processDefinition) {
- return null;
+ public void offlineWorkflowDefinition(User loginUser, Long projectCode,
Long workflowDefinitionCode) {
+ projectService.checkProjectAndAuthThrowException(loginUser,
projectCode, WORKFLOW_ONLINE_OFFLINE);
+
+ ProcessDefinition workflowDefinition =
processDefinitionDao.queryByCode(workflowDefinitionCode)
+ .orElseThrow(() -> new
ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode));
+
+ if (ReleaseState.OFFLINE.equals(workflowDefinition.getReleaseState()))
{
+ // do nothing if the workflow is already online
Review Comment:
```suggestion
// do nothing if the workflow is already offline
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]