This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 426567348e Remove quartz in service (#10748)
426567348e is described below
commit 426567348ec3dee46171c2cee4b44dd01c0a97a3
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jul 6 15:43:55 2022 +0800
Remove quartz in service (#10748)
* Remove quartz in service
---
.../api/service/impl/BaseServiceImpl.java | 4 +-
.../api/service/impl/DataAnalysisServiceImpl.java | 4 +-
.../service/impl/DqExecuteResultServiceImpl.java | 4 +-
.../api/service/impl/DqRuleServiceImpl.java | 4 +-
.../api/service/impl/ExecutorServiceImpl.java | 384 ++++++++++++---------
.../service/impl/ProcessInstanceServiceImpl.java | 2 +-
.../api/service/impl/SchedulerServiceImpl.java | 165 ++++-----
.../api/service/AuditServiceTest.java | 4 +-
.../api/service/DataAnalysisServiceTest.java | 59 ++--
.../api/service/DqExecuteResultServiceTest.java | 4 +-
.../api/service/DqRuleServiceTest.java | 4 +-
.../api/service/ProcessInstanceServiceTest.java | 8 +-
.../api/service/TaskInstanceServiceTest.java | 43 ++-
.../dolphinscheduler/common/utils/DateUtils.java | 60 ++--
.../master/runner/WorkflowExecuteRunnable.java | 89 ++---
dolphinscheduler-service/pom.xml | 18 -
.../service/{corn => cron}/AbstractCycle.java | 2 +-
.../service/{corn => cron}/CronUtils.java | 173 +++++-----
.../service/{corn => cron}/CycleFactory.java | 2 +-
.../service/{corn => cron}/CycleLinks.java | 2 +-
.../service/exceptions/CronParseException.java | 29 ++
.../service/process/ProcessService.java | 3 +-
.../service/process/ProcessServiceImpl.java | 41 +--
.../service/cron/CronUtilsTest.java | 101 +++---
.../service/process/ProcessServiceTest.java | 5 +-
dolphinscheduler-worker/pom.xml | 6 -
26 files changed, 646 insertions(+), 574 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
index bdefefe026..4c3124a104 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
@@ -193,7 +193,7 @@ public class BaseServiceImpl implements BaseService {
Map<String, Object> result = new HashMap<>();
Date start = null;
if (!StringUtils.isEmpty(startDateStr)) {
- start = DateUtils.getScheduleDate(startDateStr);
+ start = DateUtils.stringToDate(startDateStr);
if (Objects.isNull(start)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
Constants.START_END_DATE);
return result;
@@ -203,7 +203,7 @@ public class BaseServiceImpl implements BaseService {
Date end = null;
if (!StringUtils.isEmpty(endDateStr)) {
- end = DateUtils.getScheduleDate(endDateStr);
+ end = DateUtils.stringToDate(endDateStr);
if (Objects.isNull(end)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
Constants.START_END_DATE);
return result;
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
index f4f544bae6..091387203e 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
@@ -166,8 +166,8 @@ public class DataAnalysisServiceImpl extends
BaseServiceImpl implements DataAnal
Date start = null;
Date end = null;
if (!StringUtils.isEmpty(startDate) && !StringUtils.isEmpty(endDate)) {
- start = DateUtils.getScheduleDate(startDate);
- end = DateUtils.getScheduleDate(endDate);
+ start = DateUtils.stringToDate(startDate);
+ end = DateUtils.stringToDate(endDate);
if (Objects.isNull(start) || Objects.isNull(end)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
Constants.START_END_DATE);
return result;
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
index c5ee6363a5..ae14e9e4af 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
@@ -66,10 +66,10 @@ public class DqExecuteResultServiceImpl extends
BaseServiceImpl implements DqExe
Date end = null;
try {
if (StringUtils.isNotEmpty(startTime)) {
- start = DateUtils.getScheduleDate(startTime);
+ start = DateUtils.stringToDate(startTime);
}
if (StringUtils.isNotEmpty(endTime)) {
- end = DateUtils.getScheduleDate(endTime);
+ end = DateUtils.stringToDate(endTime);
}
} catch (Exception e) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
"startTime,endTime");
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
index a4f78ec830..9c398d8609 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
@@ -163,10 +163,10 @@ public class DqRuleServiceImpl extends BaseServiceImpl
implements DqRuleService
Date end = null;
try {
if (StringUtils.isNotEmpty(startTime)) {
- start = DateUtils.getScheduleDate(startTime);
+ start = DateUtils.stringToDate(startTime);
}
if (StringUtils.isNotEmpty(endTime)) {
- end = DateUtils.getScheduleDate(endTime);
+ end = DateUtils.stringToDate(endTime);
}
} catch (Exception e) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
"startTime,endTime");
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 0e7d1b5abb..e8092ee9fa 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -17,12 +17,17 @@
package org.apache.dolphinscheduler.api.service.impl;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-import org.apache.commons.beanutils.BeanUtils;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
+import static
org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH;
+
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
@@ -66,16 +71,18 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.corn.CronUtils;
+import org.apache.dolphinscheduler.service.cron.CronUtils;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -83,16 +90,13 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
-import static
org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
/**
* executor service impl
@@ -135,38 +139,39 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* execute process instance
*
- * @param loginUser login user
- * @param projectCode project code
- * @param processDefinitionCode process definition code
- * @param cronTime cron time
- * @param commandType command type
- * @param failureStrategy failure strategy
- * @param startNodeList start nodelist
- * @param taskDependType node dependency type
- * @param warningType warning type
- * @param warningGroupId notify group id
- * @param processInstancePriority process instance priority
- * @param workerGroup worker group name
- * @param environmentCode environment code
- * @param runMode run mode
- * @param timeout timeout
- * @param startParams the global param values which pass to new process
instance
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param processDefinitionCode process definition code
+ * @param cronTime cron time
+ * @param commandType command type
+ * @param failureStrategy failure strategy
+ * @param startNodeList start nodelist
+ * @param taskDependType node dependency type
+ * @param warningType warning type
+ * @param warningGroupId notify group id
+ * @param processInstancePriority process instance priority
+ * @param workerGroup worker group name
+ * @param environmentCode environment code
+ * @param runMode run mode
+ * @param timeout timeout
+ * @param startParams the global param values which pass to
new process instance
* @param expectedParallelismNumber the expected parallelism number when
execute complement in parallel mode
* @return execute process instance code
*/
@Override
- public Map<String, Object> execProcessInstance(User loginUser, long
projectCode,
- long processDefinitionCode,
String cronTime, CommandType commandType,
+ public Map<String, Object> execProcessInstance(User loginUser, long
projectCode, long processDefinitionCode,
+ String cronTime,
CommandType commandType,
FailureStrategy
failureStrategy, String startNodeList,
- TaskDependType
taskDependType, WarningType warningType, int warningGroupId,
- RunMode runMode,
- Priority
processInstancePriority, String workerGroup, Long environmentCode,Integer
timeout,
+ TaskDependType
taskDependType, WarningType warningType,
+ int warningGroupId, RunMode
runMode,
+ Priority
processInstancePriority, String workerGroup,
+ Long environmentCode,
Integer timeout,
Map<String, String>
startParams, Integer expectedParallelismNumber,
- int dryRun,
- ComplementDependentMode
complementDependentMode) {
+ int dryRun,
ComplementDependentMode complementDependentMode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
WORKFLOW_START);
+ Map<String, Object> result =
+ projectService.checkProjectAndAuth(loginUser, project,
projectCode, WORKFLOW_START);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -185,12 +190,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process
definition: id:{},name:{}, ",
- processDefinition.getId(), processDefinition.getName());
+ processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
}
- if(!checkScheduleTimeNum(commandType,cronTime)){
+ if (!checkScheduleTimeNum(commandType, cronTime)) {
putMsg(result, Status.SCHEDULE_TIME_NUMBER);
return result;
}
@@ -202,10 +207,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* create command
*/
- int create = this.createCommand(commandType,
processDefinition.getCode(),
- taskDependType, failureStrategy, startNodeList, cronTime,
warningType, loginUser.getId(),
- warningGroupId, runMode, processInstancePriority, workerGroup,
environmentCode, startParams,
- expectedParallelismNumber, dryRun, complementDependentMode);
+ int create =
+ this.createCommand(commandType, processDefinition.getCode(),
taskDependType, failureStrategy, startNodeList,
+ cronTime, warningType, loginUser.getId(), warningGroupId,
runMode, processInstancePriority, workerGroup,
+ environmentCode, startParams, expectedParallelismNumber,
dryRun, complementDependentMode);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
@@ -236,19 +241,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
}
/**
- *
* @param complementData
* @param cronTime
* @return CommandType is COMPLEMENT_DATA and cronTime's number is not
greater than 100 return true , otherwise return false
*/
- private boolean checkScheduleTimeNum(CommandType complementData,String
cronTime) {
+ private boolean checkScheduleTimeNum(CommandType complementData, String
cronTime) {
if (!CommandType.COMPLEMENT_DATA.equals(complementData)) {
return true;
}
- if(cronTime == null){
+ if (cronTime == null) {
return true;
}
- Map<String,String> cronMap = JSONUtils.toMap(cronTime);
+ Map<String, String> cronMap = JSONUtils.toMap(cronTime);
if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
String[] stringDates =
cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
@@ -261,14 +265,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* check whether the process definition can be executed
*
- * @param projectCode project code
+ * @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
- * @param version process instance verison
+ * @param version process instance verison
* @return check result code
*/
@Override
- public Map<String, Object> checkProcessDefinitionValid(long projectCode,
ProcessDefinition processDefinition, long processDefineCode, Integer version) {
+ public Map<String, Object> checkProcessDefinitionValid(long projectCode,
ProcessDefinition processDefinition,
+ long
processDefineCode, Integer version) {
Map<String, Object> result = new HashMap<>();
if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
// check process definition exists
@@ -287,41 +292,47 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* check if the current process has subprocesses and all subprocesses are
valid
+ *
* @param processDefinition
* @return check result
*/
@Override
public boolean checkSubProcessDefinitionValid(ProcessDefinition
processDefinition) {
// query all subprocesses under the current process
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
+ List<ProcessTaskRelation> processTaskRelations =
+
processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
if (processTaskRelations.isEmpty()) {
return true;
}
- Set<Long> relationCodes =
processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
+ Set<Long> relationCodes =
+
processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
List<TaskDefinition> taskDefinitions =
taskDefinitionMapper.queryByCodeList(relationCodes);
// find out the process definition code
Set<Long> processDefinitionCodeSet = new HashSet<>();
taskDefinitions.stream()
- .filter(task ->
TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType()))
- .forEach(taskDefinition ->
processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(),
Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))));
+ .filter(task ->
TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())).forEach(
+ taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(
+ JSONUtils.getNodeString(taskDefinition.getTaskParams(),
Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))));
if (processDefinitionCodeSet.isEmpty()) {
return true;
}
// check sub releaseState
List<ProcessDefinition> processDefinitions =
processDefinitionMapper.queryByCodes(processDefinitionCodeSet);
- return processDefinitions.stream().filter(definition ->
definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty();
+ return processDefinitions.stream()
+ .filter(definition ->
definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet())
+ .isEmpty();
}
/**
* do action to process instanceļ¼pause, stop, repeat, recover from pause,
recover from stop
*
- * @param loginUser login user
- * @param projectCode project code
+ * @param loginUser login user
+ * @param projectCode project code
* @param processInstanceId process instance id
- * @param executeType execute type
+ * @param executeType execute type
* @return execute result code
*/
@Override
@@ -329,7 +340,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.map.get(executeType));
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
+ ApiFuncIdentificationConstant.map.get(executeType));
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -345,10 +357,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
return result;
}
- ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+ ProcessDefinition processDefinition =
+
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType !=
ExecuteType.PAUSE) {
- result = checkProcessDefinitionValid(projectCode,
processDefinition, processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
+ result =
+ checkProcessDefinitionValid(projectCode, processDefinition,
processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -360,12 +375,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process
definition: id:{},name:{}, ",
- processDefinition.getId(), processDefinition.getName());
+ processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
//get the startParams user specified at the first starting while
repeat running is needed
- Map<String, Object> commandMap =
JSONUtils.parseObject(processInstance.getCommandParam(), new
TypeReference<Map<String, Object>>() {});
+ Map<String, Object> commandMap =
+ JSONUtils.parseObject(processInstance.getCommandParam(), new
TypeReference<Map<String, Object>>() {
+ });
String startParams = null;
if (MapUtils.isNotEmpty(commandMap) && executeType ==
ExecuteType.REPEAT_RUNNING) {
Object startParamsJson =
commandMap.get(Constants.CMD_PARAM_START_PARAMS);
@@ -376,19 +393,24 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
switch (executeType) {
case REPEAT_RUNNING:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), processDefinition.getVersion(),
CommandType.REPEAT_RUNNING, startParams);
+ result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(),
+ processDefinition.getVersion(),
CommandType.REPEAT_RUNNING, startParams);
break;
case RECOVER_SUSPENDED_PROCESS:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), processDefinition.getVersion(),
CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
+ result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(),
+ processDefinition.getVersion(),
CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
break;
case START_FAILURE_TASK_PROCESS:
- result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(), processDefinition.getVersion(),
CommandType.START_FAILURE_TASK_PROCESS, startParams);
+ result = insertCommand(loginUser, processInstanceId,
processDefinition.getCode(),
+ processDefinition.getVersion(),
CommandType.START_FAILURE_TASK_PROCESS, startParams);
break;
case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
- putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED,
processInstance.getName(), processInstance.getState());
+ putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED,
processInstance.getName(),
+ processInstance.getState());
} else {
- result = updateProcessInstancePrepare(processInstance,
CommandType.STOP, ExecutionStatus.READY_STOP);
+ result =
+ updateProcessInstancePrepare(processInstance,
CommandType.STOP, ExecutionStatus.READY_STOP);
}
break;
case PAUSE:
@@ -432,8 +454,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* @return true if tenant suitable, otherwise return false
*/
private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
- Tenant tenant =
processService.getTenantForProcess(processDefinition.getTenantId(),
- processDefinition.getUserId());
+ Tenant tenant =
+
processService.getTenantForProcess(processDefinition.getTenantId(),
processDefinition.getUserId());
return tenant != null;
}
@@ -441,7 +463,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* Check the state of process instance and the type of operation match
*
* @param processInstance process instance
- * @param executeType execute type
+ * @param executeType execute type
* @return check result code
*/
private Map<String, Object> checkExecuteType(ProcessInstance
processInstance, ExecuteType executeType) {
@@ -475,7 +497,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
break;
}
if (!checkResult) {
- putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), executionStatus.toString(), executeType.toString());
+ putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(),
+ executionStatus.toString(), executeType.toString());
} else {
putMsg(result, Status.SUCCESS);
}
@@ -486,11 +509,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* prepare to update process instance command type and status
*
* @param processInstance process instance
- * @param commandType command type
+ * @param commandType command type
* @param executionStatus execute status
* @return update result
*/
- private Map<String, Object> updateProcessInstancePrepare(ProcessInstance
processInstance, CommandType commandType, ExecutionStatus executionStatus) {
+ private Map<String, Object> updateProcessInstancePrepare(ProcessInstance
processInstance, CommandType commandType,
+ ExecutionStatus
executionStatus) {
Map<String, Object> result = new HashMap<>();
processInstance.setCommandType(commandType);
@@ -528,8 +552,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
taskGroupQueue.setForceStart(Flag.YES.getCode());
processService.updateTaskGroupQueue(taskGroupQueue);
- processService.sendStartTask2Master(processInstance,
taskGroupQueue.getTaskId()
-
,org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
+ processService.sendStartTask2Master(processInstance,
taskGroupQueue.getTaskId(),
+
org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -537,14 +561,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* insert command, used in the implementation of the page, re run,
recovery (pause / failure) execution
*
- * @param loginUser login user
- * @param instanceId instance id
+ * @param loginUser login user
+ * @param instanceId instance id
* @param processDefinitionCode process definition code
* @param processVersion
- * @param commandType command type
+ * @param commandType command type
* @return insert result code
*/
- private Map<String, Object> insertCommand(User loginUser, Integer
instanceId, long processDefinitionCode, int processVersion, CommandType
commandType, String startParams) {
+ private Map<String, Object> insertCommand(User loginUser, Integer
instanceId, long processDefinitionCode,
+ int processVersion, CommandType
commandType, String startParams) {
Map<String, Object> result = new HashMap<>();
//To add startParams only when repeat running is needed
@@ -607,8 +632,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
*/
if (processDefinitionTmp.getReleaseState() !=
ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
processDefinitionTmp.getName());
- logger.info("not release process definition id: {} ,
name : {}",
- processDefinitionTmp.getId(),
processDefinitionTmp.getName());
+ logger.info("not release process definition id: {} ,
name : {}", processDefinitionTmp.getId(),
+ processDefinitionTmp.getName());
return result;
}
}
@@ -621,27 +646,27 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* create command
*
- * @param commandType commandType
- * @param processDefineCode processDefineCode
- * @param nodeDep nodeDep
- * @param failureStrategy failureStrategy
- * @param startNodeList startNodeList
- * @param schedule schedule
- * @param warningType warningType
- * @param executorId executorId
- * @param warningGroupId warningGroupId
- * @param runMode runMode
+ * @param commandType commandType
+ * @param processDefineCode processDefineCode
+ * @param nodeDep nodeDep
+ * @param failureStrategy failureStrategy
+ * @param startNodeList startNodeList
+ * @param schedule schedule
+ * @param warningType warningType
+ * @param executorId executorId
+ * @param warningGroupId warningGroupId
+ * @param runMode runMode
* @param processInstancePriority processInstancePriority
- * @param workerGroup workerGroup
- * @param environmentCode environmentCode
+ * @param workerGroup workerGroup
+ * @param environmentCode environmentCode
* @return command id
*/
- private int createCommand(CommandType commandType, long processDefineCode,
- TaskDependType nodeDep, FailureStrategy
failureStrategy,
- String startNodeList, String schedule,
WarningType warningType,
- int executorId, int warningGroupId,
- RunMode runMode, Priority
processInstancePriority, String workerGroup, Long environmentCode,
- Map<String, String> startParams, Integer
expectedParallelismNumber, int dryRun, ComplementDependentMode
complementDependentMode) {
+ private int createCommand(CommandType commandType, long processDefineCode,
TaskDependType nodeDep,
+ FailureStrategy failureStrategy, String
startNodeList, String schedule,
+ WarningType warningType, int executorId, int
warningGroupId, RunMode runMode,
+ Priority processInstancePriority, String
workerGroup, Long environmentCode,
+ Map<String, String> startParams, Integer
expectedParallelismNumber, int dryRun,
+ ComplementDependentMode complementDependentMode)
{
/**
* instantiate command schedule instance
@@ -689,11 +714,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
if (schedule == null || StringUtils.isEmpty(schedule)) {
return 0;
}
- int check = checkScheduleTime(schedule);
- if(check == 0){
+ if (!isValidateScheduleTime(schedule)) {
+ return 0;
+ }
+ try {
+ return createComplementCommandList(schedule, runMode, command,
expectedParallelismNumber,
+ complementDependentMode);
+ } catch (CronParseException cronParseException) {
+ // this just make compile happy, since we already validate the
cron before
+ logger.error("Parse cron error", cronParseException);
return 0;
}
- return createComplementCommandList(schedule, runMode, command,
expectedParallelismNumber, complementDependentMode);
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
@@ -709,7 +740,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* @return
*/
protected int createComplementCommandList(String scheduleTimeParam,
RunMode runMode, Command command,
- Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) {
+ Integer
expectedParallelismNumber,
+ ComplementDependentMode
complementDependentMode)
+ throws CronParseException {
int createCount = 0;
String startDate = null;
String endDate = null;
@@ -718,33 +751,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
-
if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ if
(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
dateList =
scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
dateList = removeDuplicates(dateList);
}
- if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) &&
scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+ if (scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) &&
scheduleParam.containsKey(
+ CMDPARAM_COMPLEMENT_DATA_END_DATE)) {
startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
}
switch (runMode) {
case RUN_MODE_SERIAL: {
- if(StringUtils.isNotEmpty(dateList)){
+ if (StringUtils.isNotEmpty(dateList)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,
dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command);
}
- if(startDate != null && endDate != null){
+ if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
startDate);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command);
// dependent process definition
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(
+ command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
logger.info("process code: {} complement dependent in
off mode or schedule's size is 0, skip "
- + "dependent complement data",
command.getProcessDefinitionCode());
+ + "dependent complement data",
command.getProcessDefinitionCode());
} else {
dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
}
@@ -752,10 +787,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
break;
}
case RUN_MODE_PARALLEL: {
- if(startDate != null && endDate != null){
- List<Date> listDate = new ArrayList<>();
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
-
listDate.addAll(CronUtils.getSelfFireDateList(DateUtils.getScheduleDate(startDate),
DateUtils.getScheduleDate(endDate), schedules));
+ if (startDate != null && endDate != null) {
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(
+ command.getProcessDefinitionCode());
+ List<ZonedDateTime> listDate = new ArrayList<>(
+
CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate),
+ DateUtils.stringToZoneDateTime(endDate),
schedules));
int listDateSize = listDate.size();
createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) {
@@ -791,15 +828,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
processService.createCommand(command);
if (schedules.isEmpty() || complementDependentMode
== ComplementDependentMode.OFF_MODE) {
- logger.info("process code: {} complement
dependent in off mode or schedule's size is 0, skip "
+ logger.info(
+ "process code: {} complement dependent in
off mode or schedule's size is 0, skip "
+ "dependent complement data",
command.getProcessDefinitionCode());
} else {
- dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ dependentProcessDefinitionCreateCount +=
+
createComplementDependentCommand(schedules, command);
}
}
}
}
- if(StringUtils.isNotEmpty(dateList)){
+ if (StringUtils.isNotEmpty(dateList)) {
List<String> listDate =
Arrays.asList(dateList.split(COMMA));
int listDateSize = listDate.size();
createCount = listDate.size();
@@ -823,8 +862,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
default:
break;
}
- logger.info("create complement command count: {}, create dependent
complement command count: {}", createCount
- , dependentProcessDefinitionCreateCount);
+ logger.info("create complement command count: {}, create dependent
complement command count: {}", createCount,
+ dependentProcessDefinitionCreateCount);
return createCount;
}
@@ -843,9 +882,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
}
List<DependentProcessDefinition> dependentProcessDefinitionList =
-
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
- CronUtils.getMaxCycle(schedules.get(0).getCrontab()),
- dependentCommand.getWorkerGroup());
+
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
+ CronUtils.getMaxCycle(schedules.get(0).getCrontab()),
dependentCommand.getWorkerGroup());
dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
@@ -864,33 +902,36 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* get complement dependent process definition list
*/
private List<DependentProcessDefinition>
getComplementDependentDefinitionList(long processDefinitionCode,
-
CycleEnum processDefinitionCycle,
-
String workerGroup) {
+
CycleEnum processDefinitionCycle,
+
String workerGroup) {
List<DependentProcessDefinition> dependentProcessDefinitionList =
-
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
+
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
- return
checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup);
+ return
checkDependentProcessDefinitionValid(dependentProcessDefinitionList,
processDefinitionCycle,
+ workerGroup);
}
/**
- * Check whether the dependency cycle of the dependent node is consistent
with the schedule cycle of
- * the dependent process definition and if there is no worker group in
the schedule, use the complement selection's
- * worker group
+ * Check whether the dependency cycle of the dependent node is consistent
with the schedule cycle of
+ * the dependent process definition and if there is no worker group in the
schedule, use the complement selection's
+ * worker group
*/
- private List<DependentProcessDefinition>
checkDependentProcessDefinitionValid(List<DependentProcessDefinition>
dependentProcessDefinitionList,
-
CycleEnum processDefinitionCycle,
-
String workerGroup) {
+ private List<DependentProcessDefinition>
checkDependentProcessDefinitionValid(
+ List<DependentProcessDefinition> dependentProcessDefinitionList,
CycleEnum processDefinitionCycle,
+ String workerGroup) {
List<DependentProcessDefinition> validDependentProcessDefinitionList =
new ArrayList<>();
- List<Long> processDefinitionCodeList =
dependentProcessDefinitionList.stream()
- .map(DependentProcessDefinition::getProcessDefinitionCode)
+ List<Long> processDefinitionCodeList =
+
dependentProcessDefinitionList.stream().map(DependentProcessDefinition::getProcessDefinitionCode)
.collect(Collectors.toList());
- Map<Long, String> processDefinitionWorkerGroupMap =
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
+ Map<Long, String> processDefinitionWorkerGroupMap =
+
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle() ==
processDefinitionCycle) {
- if
(processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode())
== null) {
+ if
(processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode())
+ == null) {
dependentProcessDefinition.setWorkerGroup(workerGroup);
}
@@ -902,52 +943,51 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
}
/**
- *
* @param schedule
* @return check error return 0 otherwish 1
*/
- private int checkScheduleTime(String schedule){
- Date start = null;
- Date end = null;
- Map<String,String> scheduleResult = JSONUtils.toMap(schedule);
- if(scheduleResult == null){
- return 0;
- }
-
if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
- if(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)
== null){
- return 0;
+ private boolean isValidateScheduleTime(String schedule) {
+ Map<String, String> scheduleResult = JSONUtils.toMap(schedule);
+ if (scheduleResult == null) {
+ return false;
+ }
+ if
(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+ if
(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null) {
+ return false;
}
}
- if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){
+ if (scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
String startDate =
scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
String endDate =
scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
if (startDate == null || endDate == null) {
- return 0;
- }
- start = DateUtils.getScheduleDate(startDate);
- end = DateUtils.getScheduleDate(endDate);
- if(start == null || end == null){
- return 0;
+ return false;
}
- if (start.after(end)) {
- logger.error("complement data error, wrong date start:{} and
end date:{} ",
- start, end
- );
- return 0;
+ try {
+ ZonedDateTime start =
DateUtils.stringToZoneDateTime(startDate);
+ ZonedDateTime end = DateUtils.stringToZoneDateTime(endDate);
+ if (start == null || end == null) {
+ return false;
+ }
+ if (start.isAfter(end)) {
+ logger.error("complement data error, wrong date start:{}
and end date:{} ", start, end);
+ return false;
+ }
+ } catch (Exception ex) {
+ logger.warn("Parse schedule time error, startDate: {},
endDate: {}", startDate, endDate);
+ return false;
}
}
- return 1;
+ return true;
}
/**
- *
* @param scheduleTimeList
* @return remove duplicate date list
*/
- private String removeDuplicates(String scheduleTimeList){
+ private String removeDuplicates(String scheduleTimeList) {
HashSet<String> removeDate = new HashSet<String>();
List<String> resultList = new ArrayList<String>();
- if(StringUtils.isNotEmpty(scheduleTimeList)){
+ if (StringUtils.isNotEmpty(scheduleTimeList)) {
String[] dateArrays = scheduleTimeList.split(COMMA);
List<String> dateList = Arrays.asList(dateArrays);
removeDate.addAll(dateList);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 38d5893247..19f4df3333 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -560,7 +560,7 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
private void setProcessInstance(ProcessInstance processInstance, String
tenantCode, String scheduleTime, String globalParams, int timeout, String
timezone) {
Date schedule = processInstance.getScheduleTime();
if (scheduleTime != null) {
- schedule = DateUtils.getScheduleDate(scheduleTime);
+ schedule = DateUtils.stringToDate(scheduleTime);
}
processInstance.setScheduleTime(schedule);
List<Property> globalParamList = JSONUtils.toList(globalParams,
Property.class);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 5457872d18..983efa7a11 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -46,28 +46,31 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
-import org.apache.dolphinscheduler.service.corn.CronUtils;
+import org.apache.dolphinscheduler.service.cron.CronUtils;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
-import java.text.ParseException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
-import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.cronutils.model.Cron;
/**
* scheduler service impl
@@ -108,16 +111,16 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* save schedule
*
- * @param loginUser login user
- * @param projectCode project name
- * @param processDefineCode process definition code
- * @param schedule scheduler
- * @param warningType warning type
- * @param warningGroupId warning group id
- * @param failureStrategy failure strategy
+ * @param loginUser login user
+ * @param projectCode project name
+ * @param processDefineCode process definition code
+ * @param schedule scheduler
+ * @param warningType warning type
+ * @param warningGroupId warning group id
+ * @param failureStrategy failure strategy
* @param processInstancePriority process instance priority
- * @param workerGroup worker group
- * @param environmentCode environment code
+ * @param workerGroup worker group
+ * @param environmentCode environment code
* @return create result code
*/
@Override
@@ -138,14 +141,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode);
// check project auth
- boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result,null);
+ boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) {
return result;
}
// check work flow define release state
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefineCode);
- result =
executorService.checkProcessDefinitionValid(projectCode,processDefinition,
processDefineCode, processDefinition.getVersion());
+ result = executorService.checkProcessDefinitionValid(projectCode,
processDefinition, processDefineCode,
+ processDefinition.getVersion());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -209,15 +213,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* updateProcessInstance schedule
*
- * @param loginUser login user
- * @param projectCode project code
- * @param id scheduler id
- * @param scheduleExpression scheduler
- * @param warningType warning type
- * @param warningGroupId warning group id
- * @param failureStrategy failure strategy
- * @param workerGroup worker group
- * @param environmentCode environment code
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param id scheduler id
+ * @param scheduleExpression scheduler
+ * @param warningType warning type
+ * @param warningGroupId warning group id
+ * @param failureStrategy failure strategy
+ * @param workerGroup worker group
+ * @param environmentCode environment code
* @param processInstancePriority process instance priority
* @return update result code
*/
@@ -238,7 +242,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode);
// check project auth
- boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result,null);
+ boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) {
return result;
}
@@ -265,9 +269,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* set schedule online or offline
*
- * @param loginUser login user
- * @param projectCode project code
- * @param id scheduler id
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param id scheduler id
* @param scheduleStatus schedule status
* @return publish result code
*/
@@ -281,7 +285,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode);
// check project auth
- boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result,null);
+ boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) {
return result;
}
@@ -296,7 +300,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.info("schedule release is already {},needn't to change
schedule id: {} from {} to {}",
- scheduleObj.getReleaseState(), scheduleObj.getId(),
scheduleObj.getReleaseState(), scheduleStatus);
+ scheduleObj.getReleaseState(), scheduleObj.getId(),
scheduleObj.getReleaseState(), scheduleStatus);
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE,
scheduleStatus);
return result;
}
@@ -313,8 +317,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
- logger.info("not release process definition id: {} , name :
{}",
- processDefinition.getId(),
processDefinition.getName());
+ logger.info("not release process definition id: {} , name :
{}", processDefinition.getId(),
+ processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
processDefinition.getName());
return result;
}
@@ -323,7 +327,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
processService.recurseFindSubProcess(processDefinition.getCode(),
subProcessDefineCodes);
if (!subProcessDefineCodes.isEmpty()) {
List<ProcessDefinition> subProcessDefinitionList =
-
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
+
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
if (subProcessDefinitionList != null &&
!subProcessDefinitionList.isEmpty()) {
for (ProcessDefinition subProcessDefinition :
subProcessDefinitionList) {
/**
@@ -331,8 +335,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
*/
if (subProcessDefinition.getReleaseState() !=
ReleaseState.ONLINE) {
logger.info("not release process definition id: {}
, name : {}",
- subProcessDefinition.getId(),
subProcessDefinition.getName());
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
+ subProcessDefinition.getId(),
subProcessDefinition.getName());
+ putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
+ String.valueOf(subProcessDefinition.getId()));
return result;
}
}
@@ -379,17 +384,17 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* query schedule
*
- * @param loginUser login user
- * @param projectCode project code
+ * @param loginUser login user
+ * @param projectCode project code
* @param processDefineCode process definition code
- * @param pageNo page number
- * @param pageSize page size
- * @param searchVal search value
+ * @param pageNo page number
+ * @param pageSize page size
+ * @param searchVal search value
* @return schedule list page
*/
@Override
public Result querySchedule(User loginUser, long projectCode, long
processDefineCode, String searchVal,
- Integer pageNo, Integer pageSize)
{
+ Integer pageNo, Integer pageSize) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
@@ -407,8 +412,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
}
Page<Schedule> page = new Page<>(pageNo, pageSize);
- IPage<Schedule> scheduleIPage =
scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode,
- searchVal);
+ IPage<Schedule> scheduleIPage =
+ scheduleMapper.queryByProcessDefineCodePaging(page,
processDefineCode, searchVal);
List<ScheduleVo> scheduleList = new ArrayList<>();
for (Schedule schedule : scheduleIPage.getRecords()) {
@@ -426,7 +431,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* query schedule list
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
* @return schedule list
*/
@@ -436,7 +441,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode);
// check project auth
- boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result,null);
+ boolean hasProjectAndPerm =
projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) {
return result;
}
@@ -461,7 +466,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* delete schedule
*
- * @param projectId project id
+ * @param projectId project id
* @param scheduleId schedule id
* @throws RuntimeException runtime exception
*/
@@ -475,7 +480,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
* check valid
*
* @param result result
- * @param bool bool
+ * @param bool bool
* @param status status
* @return check result code
*/
@@ -491,9 +496,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* delete schedule by id
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param scheduleId scheule id
+ * @param scheduleId scheule id
* @return delete result code
*/
@Override
@@ -502,7 +507,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
- Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectCode,null);
+ Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
@@ -516,8 +521,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
}
// Determine if the login user is the owner of the schedule
- if (loginUser.getId() != schedule.getUserId()
- && loginUser.getUserType() != UserType.ADMIN_USER) {
+ if (loginUser.getId() != schedule.getUserId() &&
loginUser.getUserType() != UserType.ADMIN_USER) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -542,30 +546,32 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
* preview schedule
*
* @param loginUser login user
- * @param schedule schedule expression
+ * @param schedule schedule expression
* @return the next five fire time
*/
@Override
public Map<String, Object> previewSchedule(User loginUser, String
schedule) {
Map<String, Object> result = new HashMap<>();
- CronExpression cronExpression;
+ Cron cron;
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule,
ScheduleParam.class);
- Date now = new Date();
- Date startTime =
DateUtils.transformTimezoneDate(scheduleParam.getStartTime(),
scheduleParam.getTimezoneId());
- Date endTime =
DateUtils.transformTimezoneDate(scheduleParam.getEndTime(),
scheduleParam.getTimezoneId());
- startTime = now.after(startTime) ? now : startTime;
+ ZoneId zoneId =
TimeZone.getTimeZone(scheduleParam.getTimezoneId()).toZoneId();
+ ZonedDateTime now = ZonedDateTime.now(zoneId);
+ ZonedDateTime startTime =
ZonedDateTime.ofInstant(scheduleParam.getStartTime().toInstant(), zoneId);
+ ZonedDateTime endTime =
ZonedDateTime.ofInstant(scheduleParam.getEndTime().toInstant(), zoneId);
+ startTime = now.isAfter(startTime) ? now : startTime;
try {
- cronExpression =
CronUtils.parse2CronExpression(scheduleParam.getCrontab());
- } catch (ParseException e) {
+ cron = CronUtils.parse2Cron(scheduleParam.getCrontab());
+ } catch (CronParseException e) {
logger.error(e.getMessage(), e);
putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
return result;
}
- List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime,
endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
- List<String> previewDateList = new ArrayList<>();
- selfFireDateList.forEach(date ->
previewDateList.add(DateUtils.dateToString(date,
scheduleParam.getTimezoneId())));
+ List<ZonedDateTime> selfFireDateList =
+ CronUtils.getSelfFireDateList(startTime, endTime, cron,
Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
+ List<String> previewDateList =
+
selfFireDateList.stream().map(DateUtils::dateToString).collect(Collectors.toList());
result.put(Constants.DATA_LIST, previewDateList);
putMsg(result, Status.SUCCESS);
return result;
@@ -574,14 +580,14 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
/**
* update process definition schedule
*
- * @param loginUser login user
- * @param projectCode project code
- * @param processDefinitionCode process definition code
- * @param scheduleExpression scheduleExpression
- * @param warningType warning type
- * @param warningGroupId warning group id
- * @param failureStrategy failure strategy
- * @param workerGroup worker group
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param processDefinitionCode process definition code
+ * @param scheduleExpression scheduleExpression
+ * @param warningType warning type
+ * @param warningGroupId warning group id
+ * @param failureStrategy failure strategy
+ * @param workerGroup worker group
* @param processInstancePriority process instance priority
* @return update result code
*/
@@ -598,7 +604,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
long
environmentCode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
- Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,null);
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -619,17 +625,12 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
return result;
}
- private void updateSchedule(Map<String, Object> result,
- Schedule schedule,
- ProcessDefinition processDefinition,
- String scheduleExpression,
- WarningType warningType,
- int warningGroupId,
- FailureStrategy failureStrategy,
- Priority processInstancePriority,
- String workerGroup,
+ private void updateSchedule(Map<String, Object> result, Schedule schedule,
ProcessDefinition processDefinition,
+ String scheduleExpression, WarningType
warningType, int warningGroupId,
+ FailureStrategy failureStrategy, Priority
processInstancePriority, String workerGroup,
long environmentCode) {
- if (checkValid(result, schedule.getReleaseState() ==
ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
+ if (checkValid(result, schedule.getReleaseState() ==
ReleaseState.ONLINE,
+ Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
return;
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java
index 807c89a60f..c0cf9ad3ef 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java
@@ -61,8 +61,8 @@ public class AuditServiceTest {
@Test
public void testQueryLogListPaging() {
- Date start = DateUtils.getScheduleDate("2020-11-01 00:00:00");
- Date end = DateUtils.getScheduleDate("2020-11-02 00:00:00");
+ Date start = DateUtils.stringToDate("2020-11-01 00:00:00");
+ Date end = DateUtils.stringToDate("2020-11-02 00:00:00");
IPage<AuditLog> page = new Page<>(1, 10);
page.setRecords(getLists());
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
index 7d2fb77de7..34df74d697 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
@@ -17,9 +17,16 @@
package org.apache.dolphinscheduler.api.service;
-import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
+import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT_OVERVIEW;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+
import org.apache.dolphinscheduler.api.dto.CommandStateCount;
import org.apache.dolphinscheduler.api.enums.Status;
+import
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.DataAnalysisServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
@@ -39,7 +46,16 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -53,21 +69,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT_OVERVIEW;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-
/**
* data analysis service test
*/
@@ -142,10 +143,14 @@ public class DataAnalysisServiceTest {
Mockito.when(projectMapper.queryByCode(1L)).thenReturn(getProject("test"));
//SUCCESS
-
Mockito.when(taskInstanceMapper.countTaskInstanceStateByProjectCodes(DateUtils.getScheduleDate(startDate),
- DateUtils.getScheduleDate(endDate), new
Long[]{1L})).thenReturn(getTaskInstanceStateCounts());
+
Mockito.when(taskInstanceMapper.countTaskInstanceStateByProjectCodes(DateUtils.stringToDate(startDate),
+ DateUtils.stringToDate(endDate),
+ new Long[] {1L})).thenReturn(getTaskInstanceStateCounts());
Mockito.when(projectMapper.selectById(Mockito.any())).thenReturn(getProject("test"));
- Mockito.when(projectService.hasProjectAndPerm(Mockito.any(),
Mockito.any(), (Map<String,
Object>)Mockito.any(),Mockito.any())).thenReturn(true);
+ Mockito.when(projectService.hasProjectAndPerm(Mockito.any(),
+ Mockito.any(),
+ (Map<String, Object>) Mockito.any(),
+ Mockito.any())).thenReturn(true);
result = dataAnalysisServiceImpl.countTaskStateByProject(user, 1,
startDate, endDate);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
@@ -236,18 +241,22 @@ public class DataAnalysisServiceTest {
//checkProject false
Map<String, Object> failResult = new HashMap<>();
putMsg(failResult, Status.PROJECT_NOT_FOUND, 1);
- Mockito.when(projectService.checkProjectAndAuth(any(), any(),
anyLong(),any())).thenReturn(failResult);
+ Mockito.when(projectService.checkProjectAndAuth(any(), any(),
anyLong(), any())).thenReturn(failResult);
failResult =
dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate,
endDate);
Assert.assertEquals(Status.PROJECT_NOT_FOUND,
failResult.get(Constants.STATUS));
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, null);
- Mockito.when(projectService.checkProjectAndAuth(any(), any(),
anyLong(),any())).thenReturn(result);
+ Mockito.when(projectService.checkProjectAndAuth(any(), any(),
anyLong(), any())).thenReturn(result);
//SUCCESS
-
Mockito.when(processInstanceMapper.countInstanceStateByProjectCodes(DateUtils.getScheduleDate(startDate),
- DateUtils.getScheduleDate(endDate), new
Long[]{1L})).thenReturn(getTaskInstanceStateCounts());
- Mockito.when(projectService.hasProjectAndPerm(Mockito.any(),
Mockito.any(), (Map<String,
Object>)Mockito.any(),Mockito.any())).thenReturn(true);
+
Mockito.when(processInstanceMapper.countInstanceStateByProjectCodes(DateUtils.stringToDate(startDate),
+ DateUtils.stringToDate(endDate),
+ new Long[] {1L})).thenReturn(getTaskInstanceStateCounts());
+ Mockito.when(projectService.hasProjectAndPerm(Mockito.any(),
+ Mockito.any(),
+ (Map<String, Object>) Mockito.any(),
+ Mockito.any())).thenReturn(true);
result =
dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate,
endDate);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java
index 91e36c732a..cf380a33bd 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java
@@ -73,8 +73,8 @@ public class DqExecuteResultServiceTest {
String searchVal = "";
int ruleType = 0;
- Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
- Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
+ Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
+ Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
User loginUser = new User();
loginUser.setId(1);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java
index 1a2358a642..d82a22cd5b 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java
@@ -128,8 +128,8 @@ public class DqRuleServiceTest {
String searchVal = "";
int ruleType = 0;
- Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
- Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
+ Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
+ Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
User loginUser = new User();
loginUser.setId(1);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index 039da1e397..dde556e453 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -171,8 +171,8 @@ public class ProcessInstanceServiceTest {
"192.168.xx.xx", "",1, 10);
Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int)
proejctAuthFailRes.getCode());
- Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
- Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
+ Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
+ Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance();
List<ProcessInstance> processInstanceList = new ArrayList<>();
Page<ProcessInstance> pageReturn = new Page<>(1, 10);
@@ -246,8 +246,8 @@ public class ProcessInstanceServiceTest {
int size = 10;
String startTime = "2020-01-01 00:00:00";
String endTime = "2020-08-02 00:00:00";
- Date start = DateUtils.getScheduleDate(startTime);
- Date end = DateUtils.getScheduleDate(endTime);
+ Date start = DateUtils.stringToDate(startTime);
+ Date end = DateUtils.stringToDate(endTime);
//project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index a2f1c539eb..0ce57acf13 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
+
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@@ -99,22 +100,44 @@ public class TaskInstanceServiceTest {
//project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_INSTANCE)).thenReturn(result);
- Result projectAuthFailRes =
taskInstanceService.queryTaskListPaging(loginUser, projectCode, 0, "", "",
- "test_user", "2019-02-26 19:48:00", "2019-02-26 19:48:22", "",
null, "", 1, 20);
- Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(),
(int)projectAuthFailRes.getCode());
+ Result projectAuthFailRes =
taskInstanceService.queryTaskListPaging(loginUser,
+ projectCode,
+ 0,
+ "",
+ "",
+ "test_user",
+ "2019-02-26 19:48:00",
+ "2019-02-26 19:48:22",
+ "",
+ null,
+ "",
+ 1,
+ 20);
+ Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int)
projectAuthFailRes.getCode());
// data parameter check
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
- when(projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_INSTANCE)).thenReturn(result);
- Result dataParameterRes =
taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "",
- "test_user", "20200101 00:00:00", "2020-01-02 00:00:00", "",
ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
- Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(),
(int)dataParameterRes.getCode());
+ when(projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_INSTANCE)).thenReturn(result);
+ Result dataParameterRes =
taskInstanceService.queryTaskListPaging(loginUser,
+ projectCode,
+ 1,
+ "",
+ "",
+ "test_user",
+ "20200101 00:00:00",
+ "2020-01-02 00:00:00",
+ "",
+ ExecutionStatus.SUCCESS,
+ "192.168.xx.xx",
+ 1,
+ 20);
+ Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(),
(int) dataParameterRes.getCode());
//project
putMsg(result, Status.SUCCESS, projectCode);
- Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
- Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
+ Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
+ Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance();
TaskInstance taskInstance = getTaskInstance();
List<TaskInstance> taskInstanceList = new ArrayList<>();
@@ -122,7 +145,7 @@ public class TaskInstanceServiceTest {
taskInstanceList.add(taskInstance);
pageReturn.setRecords(taskInstanceList);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
- when(projectService.checkProjectAndAuth(loginUser, project,
projectCode,TASK_INSTANCE)).thenReturn(result);
+ when(projectService.checkProjectAndAuth(loginUser, project,
projectCode, TASK_INSTANCE)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class),
eq(project.getCode()), eq(1), eq(""), eq(""), eq(""),
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
index 3537948410..f01f440240 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
@@ -31,6 +31,9 @@ import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +48,8 @@ public final class DateUtils {
static final long C6 = C5 * 24L;
private static final Logger logger =
LoggerFactory.getLogger(DateUtils.class);
- private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS =
DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS);
+ private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS =
+ DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS);
private DateUtils() {
throw new UnsupportedOperationException("Construct DateUtils");
@@ -66,7 +70,7 @@ public final class DateUtils {
/**
* date to local datetime
*
- * @param date date
+ * @param date date
* @param zoneId zoneId
* @return local datetime
*/
@@ -119,7 +123,8 @@ public final class DateUtils {
}
public static String format(Date date, DateTimeFormatter
dateTimeFormatter, String timezone) {
- LocalDateTime localDateTime = StringUtils.isEmpty(timezone) ?
date2LocalDateTime(date) : date2LocalDateTime(date, ZoneId.of(timezone));
+ LocalDateTime localDateTime =
+ StringUtils.isEmpty(timezone) ? date2LocalDateTime(date) :
date2LocalDateTime(date, ZoneId.of(timezone));
return format(localDateTime, dateTimeFormatter);
}
@@ -151,7 +156,7 @@ public final class DateUtils {
/**
* convert time to yyyy-MM-dd HH:mm:ss format
*
- * @param date date
+ * @param date date
* @param timezone timezone
* @return date string
*/
@@ -159,11 +164,15 @@ public final class DateUtils {
return format(date, YYYY_MM_DD_HH_MM_SS, timezone);
}
+ public static String dateToString(ZonedDateTime zonedDateTime) {
+ return YYYY_MM_DD_HH_MM_SS.format(zonedDateTime);
+ }
+
/**
* convert string to date and time
*
- * @param date date
- * @param format format
+ * @param date date
+ * @param format format
* @param timezone timezone, if null, use system default timezone
* @return date
*/
@@ -184,20 +193,39 @@ public final class DateUtils {
return null;
}
+ public static ZonedDateTime parseZoneDateTime(@Nonnull String date,
@Nonnull DateTimeFormatter dateTimeFormatter,
+ @Nullable String timezone) {
+ ZonedDateTime zonedDateTime = ZonedDateTime.parse(date,
dateTimeFormatter);
+ if (StringUtils.isNotEmpty(timezone)) {
+ return zonedDateTime.withZoneSameInstant(ZoneId.of(timezone));
+ }
+ return zonedDateTime;
+ }
+
/**
* convert date str to yyyy-MM-dd HH:mm:ss format
*
* @param date date string
* @return yyyy-MM-dd HH:mm:ss format
*/
- public static Date stringToDate(String date) {
+ public static @Nullable Date stringToDate(String date) {
return parse(date, YYYY_MM_DD_HH_MM_SS, null);
}
+ public static ZonedDateTime stringToZoneDateTime(@Nonnull String date) {
+ Date d = stringToDate(date);
+ if (d == null) {
+ throw new IllegalArgumentException(String.format(
+ "data: %s should be a validate data string - yyyy-MM-dd
HH:mm:ss ",
+ date));
+ }
+ return ZonedDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault());
+ }
+
/**
* convert date str to yyyy-MM-dd HH:mm:ss format
*
- * @param date date string
+ * @param date date string
* @param timezone
* @return yyyy-MM-dd HH:mm:ss format
*/
@@ -267,16 +295,6 @@ public final class DateUtils {
return future.getTime() > old.getTime();
}
- /**
- * convert schedule string to date
- *
- * @param schedule schedule
- * @return convert schedule string to date
- */
- public static Date getScheduleDate(String schedule) {
- return stringToDate(schedule);
- }
-
/**
* format time to readable
*
@@ -554,8 +572,10 @@ public final class DateUtils {
return date;
}
String dateToString = dateToString(date, sourceTimezoneId);
- LocalDateTime localDateTime = LocalDateTime.parse(dateToString,
DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS));
- ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime,
TimeZone.getTimeZone(targetTimezoneId).toZoneId());
+ LocalDateTime localDateTime =
+ LocalDateTime.parse(dateToString,
DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS));
+ ZonedDateTime zonedDateTime =
+ ZonedDateTime.of(localDateTime,
TimeZone.getTimeZone(targetTimezoneId).toZoneId());
return Date.from(zonedDateTime.toInstant());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index fdec38e65e..525aa6154e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -76,7 +76,8 @@ import
org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
-import org.apache.dolphinscheduler.service.corn.CronUtils;
+import org.apache.dolphinscheduler.service.cron.CronUtils;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
@@ -323,15 +324,14 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
public String getKey() {
- if (StringUtils.isNotEmpty(key)
- || this.processDefinition == null) {
+ if (StringUtils.isNotEmpty(key) || this.processDefinition == null) {
return key;
}
key = String.format("%d_%d_%d",
- this.processDefinition.getCode(),
- this.processDefinition.getVersion(),
- this.processInstance.getId());
+ this.processDefinition.getCode(),
+ this.processDefinition.getVersion(),
+ this.processInstance.getId());
return key;
}
@@ -443,8 +443,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
this.stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance =
this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
- this.processService.sendStartTask2Master(processInstance,
nextTaskInstance.getId(),
-
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+ this.processService.sendStartTask2Master(processInstance,
nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
@@ -468,13 +467,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(),
newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
- logger.info("failure task will be submitted: process id: {}, task
instance code: {} state:{} retry times:{} / {}, interval:{}",
- processInstance.getId(),
- newTaskInstance.getTaskCode(),
- newTaskInstance.getState(),
- newTaskInstance.getRetryTimes(),
- newTaskInstance.getMaxRetryTimes(),
- newTaskInstance.getRetryInterval());
+ logger.info("failure task will be submitted: process id: {}, task
instance code: {} state:{} retry times:{} / {}, interval:{}",
processInstance.getId(), newTaskInstance.getTaskCode(),
+ newTaskInstance.getState(), newTaskInstance.getRetryTimes(),
newTaskInstance.getMaxRetryTimes(), newTaskInstance.getRetryInterval());
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance,
newTaskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance,
newTaskInstance);
} else {
@@ -490,8 +484,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
public void refreshProcessInstance(int processInstanceId) {
logger.info("process instance update: {}", processInstanceId);
processInstance =
processService.findProcessInstanceById(processInstanceId);
- processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
@@ -612,10 +605,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
// complement data ends || no success
return true;
}
- logger.info("process complement continue. process id:{}, schedule
time:{} complementListDate:{}",
- processInstance.getId(),
- processInstance.getScheduleTime(),
- complementListDate);
+ logger.info("process complement continue. process id:{}, schedule
time:{} complementListDate:{}", processInstance.getId(),
processInstance.getScheduleTime(), complementListDate);
scheduleDate = complementListDate.get(index + 1);
}
//the next process complement
@@ -663,8 +653,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
private boolean needComplementProcess() {
- if (processInstance.isComplementData()
- && Flag.NO == processInstance.getIsSubProcess()) {
+ if (processInstance.isComplementData() && Flag.NO ==
processInstance.getIsSubProcess()) {
return true;
}
return false;
@@ -719,11 +708,13 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
public void checkSerialProcess(ProcessDefinition processDefinition) {
int nextInstanceId = processInstance.getNextProcessInstanceId();
if (nextInstanceId == 0) {
- ProcessInstance nextProcessInstance =
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),
ExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId());
+ ProcessInstance nextProcessInstance =
+
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),
ExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId());
if (nextProcessInstance == null) {
return;
}
- ProcessInstance nextReadyStopProcessInstance =
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),
ExecutionStatus.READY_STOP.getCode(), processInstance.getId());
+ ProcessInstance nextReadyStopProcessInstance =
+
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),
ExecutionStatus.READY_STOP.getCode(), processInstance.getId());
if (processDefinition.getExecutionType().typeIsSerialPriority() &&
nextReadyStopProcessInstance != null) {
return;
}
@@ -753,8 +744,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
if (this.dag != null) {
return;
}
- processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList =
getRecoverTaskInstanceList(processInstance.getCommandParam());
@@ -773,8 +763,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
// generate process to get DAG info
List<String> recoveryNodeCodeList =
getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList =
parseStartNodeName(processInstance.getCommandParam());
- ProcessDag processDag = generateFlowDag(taskNodeList,
- startNodeNameList, recoveryNodeCodeList,
processInstance.getTaskDependType());
+ ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
return;
@@ -786,7 +775,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
/**
* init task queue
*/
- private void initTaskQueue() throws StateEventHandleException {
+ private void initTaskQueue() throws StateEventHandleException,
CronParseException {
taskFailedSubmit = false;
activeTaskProcessorMaps.clear();
@@ -856,15 +845,13 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
if
(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
complementListDate =
CronUtils.getSelfScheduleDateList(cmdParam);
}
- logger.info(" process definition code:{} complement data:
{}",
- processInstance.getProcessDefinitionCode(),
complementListDate);
+ logger.info(" process definition code:{} complement data:
{}", processInstance.getProcessDefinitionCode(), complementListDate);
if (!complementListDate.isEmpty() && Flag.NO ==
processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
- String globalParams =
curingParamsService.curingGlobalParams(processInstance.getId(),
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));
+ String globalParams =
+
curingParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA,
+ processInstance.getScheduleTime(),
cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance);
}
@@ -887,16 +874,13 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
ITaskProcessor taskProcessor =
TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
taskProcessor.init(taskInstance, processInstance);
- if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
- &&
taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
+ if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
- logger.error("process id:{} name:{} submit standby task id:{}
name:{} failed!",
- processInstance.getId(), processInstance.getName(),
- taskInstance.getId(), taskInstance.getName());
+ logger.error("process id:{} name:{} submit standby task id:{}
name:{} failed!", processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return Optional.empty();
}
@@ -1373,11 +1357,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
* @return ExecutionStatus
*/
private ExecutionStatus runningState(ExecutionStatus state) {
- if (state == ExecutionStatus.READY_STOP
- || state == ExecutionStatus.READY_PAUSE
- || state == ExecutionStatus.WAITING_THREAD
- || state == ExecutionStatus.READY_BLOCK
- || state == ExecutionStatus.DELAY_EXECUTION) {
+ if (state == ExecutionStatus.READY_STOP || state ==
ExecutionStatus.READY_PAUSE || state == ExecutionStatus.WAITING_THREAD || state
== ExecutionStatus.READY_BLOCK ||
+ state == ExecutionStatus.DELAY_EXECUTION) {
// if the running task is not completed, the state remains
unchanged
return state;
} else {
@@ -1412,9 +1393,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
return true;
}
if (processInstance.getFailureStrategy() ==
FailureStrategy.CONTINUE) {
- return readyToSubmitTaskQueue.size() == 0
- && activeTaskProcessorMaps.size() == 0
- && waitToRetryTaskInstanceMap.size() == 0;
+ return readyToSubmitTaskQueue.size() == 0 &&
activeTaskProcessorMaps.size() == 0 && waitToRetryTaskInstanceMap.size() == 0;
}
}
return false;
@@ -1444,10 +1423,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
List<TaskInstance> pauseList =
getCompleteTaskByState(ExecutionStatus.PAUSE);
- if (CollectionUtils.isNotEmpty(pauseList)
- || processInstance.isBlocked()
- || !isComplementEnd()
- || readyToSubmitTaskQueue.size() > 0) {
+ if (CollectionUtils.isNotEmpty(pauseList) ||
processInstance.isBlocked() || !isComplementEnd() ||
readyToSubmitTaskQueue.size() > 0) {
return ExecutionStatus.PAUSE;
} else {
return ExecutionStatus.SUCCESS;
@@ -1511,10 +1487,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
List<TaskInstance> stopList =
getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList =
getCompleteTaskByState(ExecutionStatus.KILL);
List<TaskInstance> failList =
getCompleteTaskByState(ExecutionStatus.FAILURE);
- if (CollectionUtils.isNotEmpty(stopList)
- || CollectionUtils.isNotEmpty(killList)
- || CollectionUtils.isNotEmpty(failList)
- || !isComplementEnd()) {
+ if (CollectionUtils.isNotEmpty(stopList) ||
CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) ||
!isComplementEnd()) {
return ExecutionStatus.STOP;
} else {
return ExecutionStatus.SUCCESS;
@@ -1555,7 +1528,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
Map<String, String> cmdParam =
JSONUtils.toMap(processInstance.getCommandParam());
- Date endTime =
DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+ Date endTime =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
return processInstance.getScheduleTime().equals(endTime);
}
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
index bd05a15f45..8a3f18b555 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-service/pom.xml
@@ -55,24 +55,6 @@
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
</dependency>
- <dependency>
- <groupId>org.quartz-scheduler</groupId>
- <artifactId>quartz</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.mchange</groupId>
- <artifactId>c3p0</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.mchange</groupId>
- <artifactId>mchange-commons-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.zaxxer</groupId>
- <artifactId>HikariCP-java7</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>io.micrometer</groupId>
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java
similarity index 99%
rename from
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java
index 8e69c458c4..aa890af99d 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.corn;
+package org.apache.dolphinscheduler.service.cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
similarity index 59%
rename from
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
index e25569236e..da62ba47ef 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.corn;
+package org.apache.dolphinscheduler.service.cron;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.day;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.min;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.month;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.week;
-import static org.apache.dolphinscheduler.service.corn.CycleFactory.year;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.day;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.hour;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.min;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.month;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.week;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.year;
import static com.cronutils.model.CronType.QUARTZ;
@@ -33,27 +33,32 @@ import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
-import java.text.ParseException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
-import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
+import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
+import lombok.NonNull;
+
/**
* // todo: this utils is heavy, it rely on quartz and corn-utils.
* cron utils
@@ -74,19 +79,12 @@ public class CronUtils {
* @param cronExpression cron expression, never null
* @return Cron instance, corresponding to cron expression received
*/
- public static Cron parse2Cron(String cronExpression) {
- return QUARTZ_CRON_PARSER.parse(cronExpression);
- }
-
- /**
- * build a new CronExpression based on the string cronExpression
- *
- * @param cronExpression String representation of the cron expression the
new object should represent
- * @return CronExpression
- * @throws ParseException if the string expression cannot be parsed into a
valid
- */
- public static CronExpression parse2CronExpression(String cronExpression)
throws ParseException {
- return new CronExpression(cronExpression);
+ public static Cron parse2Cron(String cronExpression) throws
CronParseException {
+ try {
+ return QUARTZ_CRON_PARSER.parse(cronExpression);
+ } catch (Exception ex) {
+ throw new CronParseException(String.format("Parse corn expression:
[%s] error", cronExpression), ex);
+ }
}
/**
@@ -106,7 +104,12 @@ public class CronUtils {
* @return CycleEnum
*/
public static CycleEnum getMiniCycle(Cron cron) {
- return
min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).addCycle(year(cron)).getMiniCycle();
+ return min(cron).addCycle(hour(cron))
+ .addCycle(day(cron))
+ .addCycle(week(cron))
+ .addCycle(month(cron))
+ .addCycle(year(cron))
+ .getMiniCycle();
}
/**
@@ -116,23 +119,41 @@ public class CronUtils {
* @return CycleEnum
*/
public static CycleEnum getMaxCycle(String crontab) {
- return getMaxCycle(parse2Cron(crontab));
+ try {
+ return getMaxCycle(parse2Cron(crontab));
+ } catch (CronParseException ex) {
+ throw new RuntimeException("Get max cycle error", ex);
+ }
+ }
+
+
+ public static List<ZonedDateTime> getFireDateList(@NonNull ZonedDateTime
startTime,
+ @NonNull ZonedDateTime
endTime,
+ @NonNull String cron)
throws CronParseException {
+ return getFireDateList(startTime, endTime, parse2Cron(cron));
}
/**
* gets all scheduled times for a period of time based on not self
dependency
*
* @param startTime startTime
- * @param endTime endTime
- * @param cronExpression cronExpression
+ * @param endTime endTime
+ * @param cron cron
* @return date list
*/
- public static List<Date> getFireDateList(Date startTime, Date endTime,
CronExpression cronExpression) {
- List<Date> dateList = new ArrayList<>();
+ public static List<ZonedDateTime> getFireDateList(@NonNull ZonedDateTime
startTime,
+ @NonNull ZonedDateTime
endTime,
+ @NonNull Cron cron) {
+ List<ZonedDateTime> dateList = new ArrayList<>();
+ ExecutionTime executionTime = ExecutionTime.forCron(cron);
while (Stopper.isRunning()) {
- startTime = cronExpression.getNextValidTimeAfter(startTime);
- if (startTime == null || startTime.after(endTime)) {
+ Optional<ZonedDateTime> nextExecutionTimeOptional =
executionTime.nextExecution(startTime);
+ if (!nextExecutionTimeOptional.isPresent()) {
+ break;
+ }
+ startTime = nextExecutionTimeOptional.get();
+ if (startTime.isAfter(endTime)) {
break;
}
dateList.add(startTime);
@@ -142,64 +163,62 @@ public class CronUtils {
}
/**
- * gets expect scheduled times for a period of time based on self
dependency
+ * Gets expect scheduled times for a period of time based on self
dependency
*
* @param startTime startTime
- * @param endTime endTime
- * @param cronExpression cronExpression
+ * @param endTime endTime
+ * @param cron cron
* @param fireTimes fireTimes
- * @return date list
+ * @return nextTime execution list
*/
- public static List<Date> getSelfFireDateList(Date startTime, Date endTime,
CronExpression cronExpression, int fireTimes) {
- List<Date> dateList = new ArrayList<>();
+ public static List<ZonedDateTime> getSelfFireDateList(@NonNull
ZonedDateTime startTime,
+ @NonNull
ZonedDateTime endTime, @NonNull Cron cron,
+ int fireTimes) {
+ List<ZonedDateTime> executeTimes = new ArrayList<>();
+ ExecutionTime executionTime = ExecutionTime.forCron(cron);
while (fireTimes > 0) {
- startTime = cronExpression.getNextValidTimeAfter(startTime);
- if (startTime == null || startTime.after(endTime) ||
startTime.equals(endTime)) {
+ Optional<ZonedDateTime> nextTime =
executionTime.nextExecution(startTime);
+ if (!nextTime.isPresent()) {
break;
}
- dateList.add(startTime);
+ startTime = nextTime.get();
+ if (startTime.isAfter(endTime)) {
+ break;
+ }
+ executeTimes.add(startTime);
fireTimes--;
}
-
- return dateList;
+ return executeTimes;
}
- /**
- * gets all scheduled times for a period of time based on self dependency
- *
- * @param startTime startTime
- * @param endTime endTime
- * @param cronExpression cronExpression
- * @return date list
- */
- public static List<Date> getSelfFireDateList(Date startTime, Date endTime,
CronExpression cronExpression) {
- List<Date> dateList = new ArrayList<>();
-
- while (Stopper.isRunning()) {
- startTime = cronExpression.getNextValidTimeAfter(startTime);
- if (startTime == null || startTime.after(endTime) ||
startTime.equals(endTime)) {
- break;
- }
- dateList.add(startTime);
- }
+ public static List<Date> getSelfFireDateList(@NonNull final Date startTime,
+ @NonNull final Date endTime,
+ @NonNull final List<Schedule>
schedules) throws CronParseException {
+ ZonedDateTime zonedDateTimeStart =
ZonedDateTime.ofInstant(startTime.toInstant(), ZoneId.systemDefault());
+ ZonedDateTime zonedDateTimeEnd =
ZonedDateTime.ofInstant(endTime.toInstant(), ZoneId.systemDefault());
- return dateList;
+ return getSelfFireDateList(zonedDateTimeStart, zonedDateTimeEnd,
schedules).stream()
+ .map(zonedDateTime -> new
Date(zonedDateTime.toInstant().toEpochMilli()))
+ .collect(Collectors.toList());
}
/**
* gets all scheduled times for a period of time based on self dependency
* if schedulers is empty then default scheduler = 1 day
*/
- public static List<Date> getSelfFireDateList(final Date startTime, final
Date endTime, final List<Schedule> schedules) {
- List<Date> result = new ArrayList<>();
+ public static List<ZonedDateTime> getSelfFireDateList(@NonNull final
ZonedDateTime startTime,
+ @NonNull final
ZonedDateTime endTime,
+ @NonNull final
List<Schedule> schedules)
+ throws CronParseException {
+ List<ZonedDateTime> result = new ArrayList<>();
if (startTime.equals(endTime)) {
result.add(startTime);
return result;
}
// support left closed and right closed time interval (startDate <= N
<= endDate)
- Date from = new Date(startTime.getTime() -
Constants.SECOND_TIME_MILLIS);
- Date to = new Date(endTime.getTime() + Constants.SECOND_TIME_MILLIS);
+ ZonedDateTime from = startTime.minusSeconds(1L);
+ ZonedDateTime to = endTime.plusSeconds(1L);
List<Schedule> listSchedule = new ArrayList<>();
listSchedule.addAll(schedules);
@@ -209,30 +228,11 @@ public class CronUtils {
listSchedule.add(schedule);
}
for (Schedule schedule : listSchedule) {
- result.addAll(CronUtils.getSelfFireDateList(from, to,
schedule.getCrontab()));
+ result.addAll(CronUtils.getFireDateList(from, to,
schedule.getCrontab()));
}
return result;
}
- /**
- * gets all scheduled times for a period of time based on self dependency
- *
- * @param startTime startTime
- * @param endTime endTime
- * @param cron cron
- * @return date list
- */
- public static List<Date> getSelfFireDateList(Date startTime, Date endTime,
String cron) {
- CronExpression cronExpression = null;
- try {
- cronExpression = parse2CronExpression(cron);
- } catch (ParseException e) {
- logger.error(e.getMessage(), e);
- return Collections.emptyList();
- }
- return getSelfFireDateList(startTime, endTime, cronExpression);
- }
-
/**
* get expiration time
*
@@ -289,8 +289,9 @@ public class CronUtils {
/**
* get Schedule Date
+ *
* @param param
- * @return date list
+ * @return date list
*/
public static List<Date> getSelfScheduleDateList(Map<String, String>
param) {
List<Date> result = new ArrayList<>();
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java
similarity index 99%
rename from
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java
index 1a133ee7ea..00ca1029c6 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.corn;
+package org.apache.dolphinscheduler.service.cron;
import com.cronutils.model.Cron;
import com.cronutils.model.field.expression.Always;
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java
similarity index 97%
rename from
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java
index 7cc4a87f07..d4011bcd74 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.corn;
+package org.apache.dolphinscheduler.service.cron;
import com.cronutils.model.Cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java
new file mode 100644
index 0000000000..cfc46d7572
--- /dev/null
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.exceptions;
+
+public class CronParseException extends Exception {
+
+ public CronParseException(String message) {
+ super(message);
+ }
+
+ public CronParseException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 6558940673..f6eab1befa 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date;
@@ -61,7 +62,7 @@ import
org.springframework.transaction.annotation.Transactional;
public interface ProcessService {
@Transactional
- ProcessInstance handleCommand(String host, Command command);
+ ProcessInstance handleCommand(String host, Command command) throws
CronParseException;
void moveToErrorCommand(Command command, String message);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 08921e72a7..f9d0782f95 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.service.process;
-import io.micrometer.core.annotation.Counted;
-import static java.util.stream.Collectors.toSet;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@@ -33,6 +31,8 @@ import static
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+import static java.util.stream.Collectors.toSet;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -51,7 +51,6 @@ import
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import
org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
@@ -130,12 +129,14 @@ import
org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.corn.CronUtils;
+import org.apache.dolphinscheduler.service.cron.CronUtils;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -152,7 +153,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -166,6 +166,8 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+
/**
* process relative dao that some mappers in this.
*/
@@ -277,14 +279,13 @@ public class ProcessServiceImpl implements ProcessService
{
/**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
*
- * @param logger logger
* @param host host
* @param command found command
* @return process instance
*/
@Override
@Transactional
- public ProcessInstance handleCommand(String host, Command command) {
+ public ProcessInstance handleCommand(String host, Command command) throws
CronParseException {
ProcessInstance processInstance = constructProcessInstance(command,
host);
// cannot construct process instance, return null
if (processInstance == null) {
@@ -731,15 +732,14 @@ public class ProcessServiceImpl implements ProcessService
{
* @param cmdParam cmdParam map
* @return date
*/
- private Date getScheduleTime(Command command, Map<String, String>
cmdParam) {
+ private Date getScheduleTime(Command command, Map<String, String>
cmdParam) throws CronParseException {
Date scheduleTime = command.getScheduleTime();
- if (scheduleTime == null
- && cmdParam != null
- && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ if (scheduleTime == null && cmdParam != null &&
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
Date start =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
- List<Schedule> schedules =
queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
+ List<Schedule> schedules =
+
queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
List<Date> complementDateList =
CronUtils.getSelfFireDateList(start, end, schedules);
if (complementDateList.size() > 0) {
@@ -922,12 +922,13 @@ public class ProcessServiceImpl implements ProcessService
{
* @param host host
* @return process instance
*/
- protected ProcessInstance constructProcessInstance(Command command, String
host) {
+ protected ProcessInstance constructProcessInstance(Command command, String
host) throws CronParseException {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
- processDefinition =
this.findProcessDefinition(command.getProcessDefinitionCode(),
command.getProcessDefinitionVersion());
+ processDefinition =
+ this.findProcessDefinition(command.getProcessDefinitionCode(),
command.getProcessDefinitionVersion());
if (processDefinition == null) {
logger.error("cannot find the work process define! define code :
{}", command.getProcessDefinitionCode());
return null;
@@ -1122,7 +1123,7 @@ public class ProcessServiceImpl implements ProcessService
{
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
- Map<String, String> cmdParam) {
+ Map<String, String> cmdParam) throws
CronParseException {
if (!processInstance.isComplementData()) {
return;
}
@@ -1130,16 +1131,16 @@ public class ProcessServiceImpl implements
ProcessService {
Date start =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Date> complementDate = Lists.newLinkedList();
- if(start != null && end != null){
- List<Schedule> listSchedules =
queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+ if (start != null && end != null) {
+ List<Schedule> listSchedules =
+
queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
complementDate = CronUtils.getSelfFireDateList(start, end,
listSchedules);
}
- if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){
+ if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST))
{
complementDate = CronUtils.getSelfScheduleDateList(cmdParam);
}
- if (complementDate.size() > 0
- && Flag.NO == processInstance.getIsSubProcess()) {
+ if (complementDate.size() > 0 && Flag.NO ==
processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementDate.get(0));
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java
index b53062e975..1e2b2548eb 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java
@@ -24,9 +24,10 @@ import static
com.cronutils.model.field.expression.FieldExpressionFactory.questi
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.service.corn.CronUtils;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
-import java.text.ParseException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.Date;
import org.junit.Assert;
@@ -59,15 +60,9 @@ public class CronUtilsTest {
*/
@Test
public void testCronAsString() {
- Cron cron =
CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
- .withYear(always())
- .withDoW(questionMark())
- .withMonth(always())
- .withDoM(always())
- .withHour(always())
- .withMinute(every(5))
- .withSecond(on(0))
- .instance();
+ Cron cron =
CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)).withYear(always())
+
.withDoW(questionMark()).withMonth(always()).withDoM(always()).withHour(always()).withMinute(every(5))
+ .withSecond(on(0)).instance();
// Obtain the string expression
String cronAsString = cron.asString();
@@ -78,11 +73,9 @@ public class CronUtilsTest {
/**
* cron parse test
- *
- * @throws ParseException if error throws ParseException
*/
@Test
- public void testCronParse() throws ParseException {
+ public void testCronParse() throws CronParseException {
String strCrontab = "0 1 2 3 * ? *";
Cron depCron = CronUtils.parse2Cron(strCrontab);
@@ -96,11 +89,9 @@ public class CronUtilsTest {
/**
* schedule type test
- *
- * @throws ParseException if error throws ParseException
*/
@Test
- public void testScheduleType() throws ParseException {
+ public void testScheduleType() throws CronParseException {
CycleEnum cycleEnum = CronUtils.getMaxCycle(CronUtils.parse2Cron("0
*/1 * * * ? *"));
Assert.assertEquals("MINUTE", cycleEnum.name());
@@ -129,23 +120,15 @@ public class CronUtilsTest {
* test
*/
@Test
- public void test2() {
- Cron cron1 =
CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
- .withYear(always())
- .withDoW(questionMark())
- .withMonth(always())
- .withDoM(always())
- .withHour(always())
- .withMinute(every(5))
- .withSecond(on(0))
- .instance();
+ public void test2() throws CronParseException {
+ Cron cron1 =
CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)).withYear(always())
+
.withDoW(questionMark()).withMonth(always()).withDoM(always()).withHour(always()).withMinute(every(5))
+ .withSecond(on(0)).instance();
// minute cycle
- String[] cronArayy = new String[] {"* * * * * ? *", "* 0 * * * ? *",
- "* 5 * * 3/5 ? *", "0 0 * * * ? *", "0 0 7 * 1 ? *", "0 0 7 *
1/1 ? *", "0 0 7 * 1-2 ? *", "0 0 7 * 1,2 ? *"};
+ String[] cronArayy =
+ new String[] {"* * * * * ? *", "* 0 * * * ? *", "* 5 * * 3/5 ? *",
"0 0 * * * ? *", "0 0 7 * 1 ? *",
+ "0 0 7 * 1/1 ? *", "0 0 7 * 1-2 ? *", "0 0 7 * 1,2 ? *"};
for (String minCrontab : cronArayy) {
- if (!org.quartz.CronExpression.isValidExpression(minCrontab)) {
- throw new RuntimeException(minCrontab + " verify failure, cron
expression not valid");
- }
Cron cron = CronUtils.parse2Cron(minCrontab);
CronField minField = cron.retrieve(CronFieldName.MINUTE);
logger.info("minField instanceof Between:" +
(minField.getExpression() instanceof Between));
@@ -166,7 +149,8 @@ public class CronUtilsTest {
logger.info("dayOfMonthField instanceof Every:" +
(dayOfMonthField.getExpression() instanceof Every));
logger.info("dayOfMonthField instanceof On:" +
(dayOfMonthField.getExpression() instanceof On));
logger.info("dayOfMonthField instanceof And:" +
(dayOfMonthField.getExpression() instanceof And));
- logger.info("dayOfMonthField instanceof QuestionMark:" +
(dayOfMonthField.getExpression() instanceof QuestionMark));
+ logger.info(
+ "dayOfMonthField instanceof QuestionMark:" +
(dayOfMonthField.getExpression() instanceof QuestionMark));
CronField monthField = cron.retrieve(CronFieldName.MONTH);
logger.info("monthField instanceof Between:" +
(monthField.getExpression() instanceof Between));
@@ -182,7 +166,8 @@ public class CronUtilsTest {
logger.info("dayOfWeekField instanceof Every:" +
(dayOfWeekField.getExpression() instanceof Every));
logger.info("dayOfWeekField instanceof On:" +
(dayOfWeekField.getExpression() instanceof On));
logger.info("dayOfWeekField instanceof And:" +
(dayOfWeekField.getExpression() instanceof And));
- logger.info("dayOfWeekField instanceof QuestionMark:" +
(dayOfWeekField.getExpression() instanceof QuestionMark));
+ logger.info(
+ "dayOfWeekField instanceof QuestionMark:" +
(dayOfWeekField.getExpression() instanceof QuestionMark));
CronField yearField = cron.retrieve(CronFieldName.YEAR);
logger.info("yearField instanceof Between:" +
(yearField.getExpression() instanceof Between));
@@ -203,27 +188,39 @@ public class CronUtilsTest {
}
@Test
- public void getSelfFireDateList() throws ParseException {
- Date from = DateUtils.stringToDate("2020-01-01 00:00:00");
- Date to = DateUtils.stringToDate("2020-01-31 00:00:00");
+ public void getSelfFireDateList() throws CronParseException {
+ ZonedDateTime from =
+ ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01
00:00:00").toInstant(), ZoneId.systemDefault());
+ ZonedDateTime to =
+ ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-31
00:00:00").toInstant(), ZoneId.systemDefault());
// test date
- Assert.assertEquals(0, CronUtils.getSelfFireDateList(to, from, "0 0 0
* * ? ").size());
- // test error cron
- Assert.assertEquals(0, CronUtils.getSelfFireDateList(from, to, "0 0 0
* *").size());
+ Assert.assertEquals(0, CronUtils.getFireDateList(to, from, "0 0 0 * *
? ").size());
+ try {
+ // test error cron
+ // should throw exception
+ CronUtils.getFireDateList(from, to, "0 0 0 * *").size();
+ Assert.assertTrue(false);
+ } catch (CronParseException cronParseException) {
+ Assert.assertTrue(true);
+ }
// test cron
- Assert.assertEquals(29, CronUtils.getSelfFireDateList(from, to, "0 0 0
* * ? ").size());
+ Assert.assertEquals(30, CronUtils.getFireDateList(from, to, "0 0 0 * *
? ").size());
// test other
- Assert.assertEquals(30, CronUtils.getFireDateList(from, to,
CronUtils.parse2CronExpression("0 0 0 * * ? ")).size());
- Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to,
CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size());
- from = DateUtils.stringToDate("2020-01-01 00:02:00");
- to = DateUtils.stringToDate("2020-01-01 00:02:00");
- Assert.assertEquals(1, CronUtils.getFireDateList(new
Date(from.getTime() - 1000), to, CronUtils.parse2CronExpression("0 * * * * ?
")).size());
-
- from = DateUtils.stringToDate("2020-01-01 00:02:00");
- to = DateUtils.stringToDate("2020-01-01 00:04:00");
- Assert.assertEquals(2, CronUtils.getFireDateList(new
Date(from.getTime() - 1000),
- new Date(to.getTime() - 1000),
- CronUtils.parse2CronExpression("0 * * * * ? ")).size());
+ Assert.assertEquals(30, CronUtils.getFireDateList(from, to,
CronUtils.parse2Cron("0 0 0 * * ? ")).size());
+ Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to,
CronUtils.parse2Cron("0 0 0 * * ? "), 5).size());
+ from =
+ ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01
00:02:00").toInstant(), ZoneId.systemDefault());
+ to = ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01
00:02:00").toInstant(), ZoneId.systemDefault());
+ Assert.assertEquals(1,
+ CronUtils.getFireDateList(from.minusSeconds(1L), to,
CronUtils.parse2Cron("0 * * * * ? ")).size());
+
+ from =
+ ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01
00:02:00").toInstant(), ZoneId.systemDefault());
+ to = ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01
00:04:00").toInstant(),
+ ZoneId.systemDefault());
+ Assert.assertEquals(2,
+ CronUtils.getFireDateList(from.minusSeconds(1L),
to.minusSeconds(1L), CronUtils.parse2Cron("0 * * * * ? "))
+ .size());
}
@Test
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 2e65a0cb5b..26b9e66045 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -284,7 +285,7 @@ public class ProcessServiceTest {
}
@Test
- public void testHandleCommand() {
+ public void testHandleCommand() throws CronParseException {
//cannot construct process instance, return null;
String host = "127.0.0.1";
@@ -461,7 +462,7 @@ public class ProcessServiceTest {
}
@Test(expected = ServiceException.class)
- public void testDeleteNotExistCommand() {
+ public void testDeleteNotExistCommand() throws CronParseException {
String host = "127.0.0.1";
int definitionVersion = 1;
long definitionCode = 123;
diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml
index 9ef5e94cd0..45a9890738 100644
--- a/dolphinscheduler-worker/pom.xml
+++ b/dolphinscheduler-worker/pom.xml
@@ -34,12 +34,6 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-quartz</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>