This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e411d0 [Improvement][Code style] FIX SPELL WAITTING TO WAITING ,
etc. (#4118)
3e411d0 is described below
commit 3e411d075fffa1efd58484a1a29c54f7bfab9983
Author: felix.wang <[email protected]>
AuthorDate: Tue Dec 1 08:58:55 2020 +0800
[Improvement][Code style] FIX SPELL WAITTING TO WAITING , etc. (#4118)
* FIX SPELL
* FIX SPELL AND Optimizing code conventions
* add ut cannot construct process instance, return null;
* add ut testExportProcessMetaData
* add ut testExportProcessMetaData
* add ut testImportProcessSchedule
* add ut MasterExecThreadTest
* add ut MasterExecThreadTest
* add ut testSubProcessViewTree
* add ut testComplementWithStartNodeList
* add ut testRecurseFindSubProcessId
* add ut testRecurseFindSubProcessId
* add ut testRecurseFindSubProcessId
---
.../api/service/ExecutorService.java | 193 +++++-----
.../service/impl/ProcessDefinitionServiceImpl.java | 30 +-
.../api/service/ExecutorService2Test.java | 28 +-
.../api/service/ProcessDefinitionServiceTest.java | 170 +++++++--
.../apache/dolphinscheduler/common/Constants.java | 16 +-
.../server/master/runner/MasterExecThread.java | 406 +++++++++++----------
.../server/master/MasterExecThreadTest.java | 117 ++++--
.../service/process/ProcessService.java | 64 ++--
.../service/process/ProcessServiceTest.java | 211 ++++++++++-
9 files changed, 849 insertions(+), 386 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index fb735ec..7a0fd0f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -14,39 +14,62 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.api.service;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
+import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.text.ParseException;
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
/**
* executor service
*/
@Service
-public class ExecutorService extends BaseService{
+public class ExecutorService extends BaseService {
private static final Logger logger =
LoggerFactory.getLogger(ExecutorService.class);
@@ -73,22 +96,22 @@ public class ExecutorService extends BaseService{
/**
* execute process instance
*
- * @param loginUser login user
- * @param projectName project name
- * @param processDefinitionId process Definition Id
- * @param cronTime cron time
- * @param commandType command type
- * @param failureStrategy failuer strategy
- * @param startNodeList start nodelist
- * @param taskDependType node dependency type
- * @param warningType warning type
- * @param warningGroupId notify group id
- * @param receivers receivers
- * @param receiversCc receivers cc
+ * @param loginUser login user
+ * @param projectName project name
+ * @param processDefinitionId process Definition Id
+ * @param cronTime cron time
+ * @param commandType command type
+ * @param failureStrategy failuer strategy
+ * @param startNodeList start nodelist
+ * @param taskDependType node dependency type
+ * @param warningType warning type
+ * @param warningGroupId notify group id
+ * @param receivers receivers
+ * @param receiversCc receivers cc
* @param processInstancePriority process instance priority
* @param workerGroup worker group name
* @param runMode run mode
- * @param timeout timeout
+ * @param timeout timeout
* @return execute process instance code
* @throws ParseException Parse Exception
*/
@@ -101,23 +124,23 @@ public class ExecutorService extends BaseService{
Map<String, Object> result = new HashMap<>();
// timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
- putMsg(result,Status.TASK_TIMEOUT_PARAMS_ERROR);
+ putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
return result;
}
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser,
projectName, project);
- if (checkResultAndAuth != null){
+ if (checkResultAndAuth != null) {
return checkResultAndAuth;
}
// check process define release state
ProcessDefinition processDefinition =
processDefinitionMapper.selectById(processDefinitionId);
result = checkProcessDefinitionValid(processDefinition,
processDefinitionId);
- if(result.get(Constants.STATUS) != Status.SUCCESS){
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
- if (!checkTenantSuitable(processDefinition)){
+ if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process
definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
@@ -129,14 +152,13 @@ public class ExecutorService extends BaseService{
return result;
}
-
/**
* create command
*/
int create = this.createCommand(commandType, processDefinitionId,
taskDependType, failureStrategy, startNodeList, cronTime,
warningType, loginUser.getId(),
- warningGroupId, runMode,processInstancePriority, workerGroup);
- if(create > 0 ){
+ warningGroupId, runMode, processInstancePriority, workerGroup);
+ if (create > 0) {
/**
* according to the process definition ID updateProcessInstance
and CC recipient
*/
@@ -152,6 +174,7 @@ public class ExecutorService extends BaseService{
/**
* check whether master exists
+ *
* @param result result
* @return master exists return true , otherwise return false
*/
@@ -167,7 +190,6 @@ public class ExecutorService extends BaseService{
return true;
}
-
/**
* check whether the process definition can be executed
*
@@ -175,22 +197,20 @@ public class ExecutorService extends BaseService{
* @param processDefineId process definition id
* @return check result code
*/
- public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition
processDefinition, int processDefineId){
+ public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition
processDefinition, int processDefineId) {
Map<String, Object> result = new HashMap<>();
if (processDefinition == null) {
// check process definition exists
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,processDefineId);
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE)
{
// check process definition online
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,processDefineId);
- }else{
+ putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId);
+ } else {
result.put(Constants.STATUS, Status.SUCCESS);
}
return result;
}
-
-
/**
* do action to process instance:pause, stop, repeat, recover from pause,
recover from stop
*
@@ -214,7 +234,6 @@ public class ExecutorService extends BaseService{
return result;
}
-
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
@@ -222,7 +241,7 @@ public class ExecutorService extends BaseService{
}
ProcessDefinition processDefinition =
processService.findProcessDefineById(processInstance.getProcessDefinitionId());
- if(executeType != ExecuteType.STOP && executeType !=
ExecuteType.PAUSE){
+ if (executeType != ExecuteType.STOP && executeType !=
ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition,
processInstance.getProcessDefinitionId());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
@@ -234,7 +253,7 @@ public class ExecutorService extends BaseService{
if (status != Status.SUCCESS) {
return checkResult;
}
- if (!checkTenantSuitable(processDefinition)){
+ if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process
definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
@@ -275,6 +294,7 @@ public class ExecutorService extends BaseService{
/**
* check tenant suitable
+ *
* @param processDefinition process definition
* @return true if tenant suitable, otherwise return false
*/
@@ -315,7 +335,7 @@ public class ExecutorService extends BaseService{
}
break;
case RECOVER_SUSPENDED_PROCESS:
- if (executionStatus.typeIsPause()||
executionStatus.typeIsCancel()) {
+ if (executionStatus.typeIsPause() ||
executionStatus.typeIsCancel()) {
checkResult = true;
}
break;
@@ -323,7 +343,7 @@ public class ExecutorService extends BaseService{
break;
}
if (!checkResult) {
- putMsg(result,Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), executionStatus.toString(), executeType.toString());
+ putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), executionStatus.toString(), executeType.toString());
} else {
putMsg(result, Status.SUCCESS);
}
@@ -331,7 +351,7 @@ public class ExecutorService extends BaseService{
}
/**
- * prepare to update process instance command type and status
+ * prepare to update process instance command type and status
*
* @param processInstance process instance
* @param commandType command type
@@ -370,11 +390,11 @@ public class ExecutorService extends BaseService{
command.setCommandType(commandType);
command.setProcessDefinitionId(processDefinitionId);
command.setCommandParam(String.format("{\"%s\":%d}",
- CMDPARAM_RECOVER_PROCESS_ID_STRING, instanceId));
+ CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId));
command.setExecutorId(loginUser.getId());
- if(!processService.verifyIsNeedCreateCommand(command)){
- putMsg(result,
Status.PROCESS_INSTANCE_EXECUTING_COMMAND,processDefinitionId);
+ if (!processService.verifyIsNeedCreateCommand(command)) {
+ putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND,
processDefinitionId);
return result;
}
@@ -391,28 +411,29 @@ public class ExecutorService extends BaseService{
/**
* check if sub processes are offline before starting process definition
+ *
* @param processDefineId process definition id
* @return check result code
*/
public Map<String, Object> startCheckByProcessDefinedId(int
processDefineId) {
Map<String, Object> result = new HashMap<>();
- if (processDefineId == 0){
+ if (processDefineId == 0) {
logger.error("process definition id is null");
- putMsg(result,Status.REQUEST_PARAMS_NOT_VALID_ERROR,"process
definition id");
+ putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process
definition id");
}
List<Integer> ids = new ArrayList<>();
processService.recurseFindSubProcessId(processDefineId, ids);
Integer[] idArray = ids.toArray(new Integer[ids.size()]);
- if (!ids.isEmpty()){
+ if (!ids.isEmpty()) {
List<ProcessDefinition> processDefinitionList =
processDefinitionMapper.queryDefinitionListByIdList(idArray);
- if (processDefinitionList != null){
- for (ProcessDefinition processDefinition :
processDefinitionList){
+ if (processDefinitionList != null) {
+ for (ProcessDefinition processDefinition :
processDefinitionList) {
/**
* if there is no online process, exit directly
*/
- if (processDefinition.getReleaseState() !=
ReleaseState.ONLINE){
- putMsg(result,Status.PROCESS_DEFINE_NOT_RELEASE,
processDefinition.getName());
+ if (processDefinition.getReleaseState() !=
ReleaseState.ONLINE) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
processDefinition.getName());
logger.info("not release process definition id: {} ,
name : {}",
processDefinition.getId(),
processDefinition.getName());
return result;
@@ -431,13 +452,13 @@ public class ExecutorService extends BaseService{
* @param processInstanceId process instance id
* @return receivers cc list
*/
- public Map<String, Object> getReceiverCc(Integer processDefineId,Integer
processInstanceId) {
+ public Map<String, Object> getReceiverCc(Integer processDefineId, Integer
processInstanceId) {
Map<String, Object> result = new HashMap<>();
- logger.info("processInstanceId {}",processInstanceId);
- if(processDefineId == null && processInstanceId == null){
+ logger.info("processInstanceId {}", processInstanceId);
+ if (processDefineId == null && processInstanceId == null) {
throw new RuntimeException("You must set values for parameters
processDefineId or processInstanceId");
}
- if(processDefineId == null && processInstanceId != null) {
+ if (processDefineId == null && processInstanceId != null) {
ProcessInstance processInstance =
processInstanceMapper.selectById(processInstanceId);
if (processInstance == null) {
throw new RuntimeException("processInstanceId is not exists");
@@ -445,24 +466,24 @@ public class ExecutorService extends BaseService{
processDefineId = processInstance.getProcessDefinitionId();
}
ProcessDefinition processDefinition =
processDefinitionMapper.selectById(processDefineId);
- if (processDefinition == null){
- throw new RuntimeException(String.format("processDefineId %d is
not exists",processDefineId));
+ if (processDefinition == null) {
+ throw new RuntimeException(String.format("processDefineId %d is
not exists", processDefineId));
}
String receivers = processDefinition.getReceivers();
String receiversCc = processDefinition.getReceiversCc();
- Map<String,String> dataMap = new HashMap<>();
- dataMap.put(Constants.RECEIVERS,receivers);
- dataMap.put(Constants.RECEIVERS_CC,receiversCc);
+ Map<String, String> dataMap = new HashMap<>();
+ dataMap.put(Constants.RECEIVERS, receivers);
+ dataMap.put(Constants.RECEIVERS_CC, receiversCc);
result.put(Constants.DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
-
/**
* create command
+ *
* @param commandType commandType
* @param processDefineId processDefineId
* @param nodeDep nodeDep
@@ -476,37 +497,36 @@ public class ExecutorService extends BaseService{
* @param processInstancePriority processInstancePriority
* @param workerGroup workerGroup
* @return command id
- * @throws ParseException
*/
private int createCommand(CommandType commandType, int processDefineId,
TaskDependType nodeDep, FailureStrategy
failureStrategy,
String startNodeList, String schedule,
WarningType warningType,
int executorId, int warningGroupId,
- RunMode runMode,Priority
processInstancePriority, String workerGroup) throws ParseException {
+ RunMode runMode, Priority
processInstancePriority, String workerGroup) throws ParseException {
/**
* instantiate command schedule instance
*/
Command command = new Command();
- Map<String,String> cmdParam = new HashMap<>();
- if(commandType == null){
+ Map<String, String> cmdParam = new HashMap<>();
+ if (commandType == null) {
command.setCommandType(CommandType.START_PROCESS);
- }else{
+ } else {
command.setCommandType(commandType);
}
command.setProcessDefinitionId(processDefineId);
- if(nodeDep != null){
+ if (nodeDep != null) {
command.setTaskDependType(nodeDep);
}
- if(failureStrategy != null){
+ if (failureStrategy != null) {
command.setFailureStrategy(failureStrategy);
}
- if(StringUtils.isNotEmpty(startNodeList)){
- cmdParam.put(CMDPARAM_START_NODE_NAMES, startNodeList);
+ if (StringUtils.isNotEmpty(startNodeList)) {
+ cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList);
}
- if(warningType != null){
+ if (warningType != null) {
command.setWarningType(warningType);
}
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
@@ -517,32 +537,32 @@ public class ExecutorService extends BaseService{
Date start = null;
Date end = null;
- if(StringUtils.isNotEmpty(schedule)){
+ if (StringUtils.isNotEmpty(schedule)) {
String[] interval = schedule.split(",");
- if(interval.length == 2){
+ if (interval.length == 2) {
start = DateUtils.getScheduleDate(interval[0]);
end = DateUtils.getScheduleDate(interval[1]);
}
}
// determine whether to complement
- if(commandType == CommandType.COMPLEMENT_DATA){
+ if (commandType == CommandType.COMPLEMENT_DATA) {
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
- if(null != start && null != end && !start.after(end)){
- if(runMode == RunMode.RUN_MODE_SERIAL){
+ if (null != start && null != end && !start.after(end)) {
+ if (runMode == RunMode.RUN_MODE_SERIAL) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
- }else if (runMode == RunMode.RUN_MODE_PARALLEL){
+ } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
List<Date> listDate = new LinkedList<>();
- if(!CollectionUtils.isEmpty(schedules)){
+ if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule item : schedules) {
listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
}
}
- if(!CollectionUtils.isEmpty(listDate)){
+ if (!CollectionUtils.isEmpty(listDate)) {
// loop by schedule date
for (Date date : listDate) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(date));
@@ -551,10 +571,10 @@ public class ExecutorService extends BaseService{
processService.createCommand(command);
}
return listDate.size();
- }else{
+ } else {
// loop by day
int runCunt = 0;
- while(!start.after(end)) {
+ while (!start.after(end)) {
runCunt += 1;
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(start));
@@ -565,11 +585,11 @@ public class ExecutorService extends BaseService{
return runCunt;
}
}
- }else{
+ } else {
logger.error("there is not valid schedule date for the process
definition: id:{},date:{}",
processDefineId, schedule);
}
- }else{
+ } else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
}
@@ -579,11 +599,6 @@ public class ExecutorService extends BaseService{
/**
* check result and auth
- *
- * @param loginUser
- * @param projectName
- * @param project
- * @return
*/
private Map<String, Object> checkResultAndAuth(User loginUser, String
projectName, Project project) {
// check project auth
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index e86ea83..e4b01e4 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.service.impl;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
@@ -78,7 +78,6 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -86,7 +85,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -161,13 +159,14 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @return create result code
* @throws JsonProcessingException JsonProcessingException
*/
+ @Override
public Map<String, Object> createProcessDefinition(User loginUser,
String projectName,
String name,
String
processDefinitionJson,
String desc,
String locations,
- String connects) {
+ String connects) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
@@ -230,6 +229,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
/**
* get resource ids
+ *
* @param processData process data
* @return resource ids
*/
@@ -264,6 +264,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
}
return sb.toString();
}
+
/**
* query process definition list
*
@@ -271,6 +272,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param projectName project name
* @return definition list
*/
+ @Override
public Map<String, Object> queryProcessDefinitionList(User loginUser,
String projectName) {
HashMap<String, Object> result = new HashMap<>(5);
@@ -300,6 +302,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param userId user id
* @return process definition page
*/
+ @Override
public Map<String, Object> queryProcessDefinitionListPaging(User
loginUser, String projectName, String searchVal, Integer pageNo, Integer
pageSize, Integer userId) {
Map<String, Object> result = new HashMap<>();
@@ -332,6 +335,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param processId process definition id
* @return process definition detail
*/
+ @Override
public Map<String, Object> queryProcessDefinitionById(User loginUser,
String projectName, Integer processId) {
Map<String, Object> result = new HashMap<>();
@@ -366,6 +370,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param connects connects for nodes
* @return update result code
*/
+ @Override
public Map<String, Object> updateProcessDefinition(User loginUser,
String projectName,
int id,
@@ -455,6 +460,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param name name
* @return true if process definition name not exists, otherwise false
*/
+ @Override
public Map<String, Object> verifyProcessDefinitionName(User loginUser,
String projectName, String name) {
Map<String, Object> result = new HashMap<>();
@@ -482,6 +488,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param processDefinitionId process definition id
* @return delete result code
*/
+ @Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> deleteProcessDefinitionById(User loginUser,
String projectName, Integer processDefinitionId) {
@@ -513,9 +520,9 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
return result;
}
// check process instances is already running
- List<ProcessInstance> processInstances =
processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId,
Constants.NOT_TERMINATED_STATES);
+ List<ProcessInstance> processInstances =
processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId,
Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
- putMsg(result,
Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL,processInstances.size());
+ putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL,
processInstances.size());
return result;
}
@@ -554,6 +561,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param releaseState release state
* @return release result code
*/
+ @Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> releaseProcessDefinition(User loginUser, String
projectName, int id, int releaseState) {
HashMap<String, Object> result = new HashMap<>();
@@ -621,6 +629,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
/**
* batch export process definition by ids
*/
+ @Override
public void batchExportProcessDefinitionByIds(User loginUser, String
projectName, String processDefinitionIds, HttpServletResponse response) {
if (StringUtils.isEmpty(processDefinitionIds)) {
@@ -699,6 +708,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
/**
* get export process metadata string
+ *
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @return export process metadata string
@@ -790,6 +800,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param currentProjectName current project name
* @return import process
*/
+ @Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> importProcessDefinition(User loginUser,
MultipartFile file, String currentProjectName) {
Map<String, Object> result = new HashMap<>(5);
@@ -1123,6 +1134,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param processDefinitionJson process definition json
* @return check result code
*/
+ @Override
public Map<String, Object> checkProcessNodeList(ProcessData processData,
String processDefinitionJson) {
Map<String, Object> result = new HashMap<>();
@@ -1174,6 +1186,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param defineId define id
* @return task node list
*/
+ @Override
public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId)
{
Map<String, Object> result = new HashMap<>();
@@ -1210,6 +1223,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param defineIdList define id list
* @return task node list
*/
+ @Override
public Map<String, Object> getTaskNodeListByDefinitionIdList(String
defineIdList) {
Map<String, Object> result = new HashMap<>();
@@ -1247,6 +1261,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @param projectId project id
* @return process definitions in the project
*/
+ @Override
public Map<String, Object> queryProcessDefinitionAllByProjectId(Integer
projectId) {
HashMap<String, Object> result = new HashMap<>(5);
@@ -1266,6 +1281,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
* @return tree view json data
* @throws Exception exception
*/
+ @Override
public Map<String, Object> viewTree(Integer processId, Integer limit)
throws Exception {
Map<String, Object> result = new HashMap<>();
@@ -1350,7 +1366,7 @@ public class ProcessDefinitionServiceImpl extends
BaseService implements
String taskJson = taskInstance.getTaskJson();
taskNode = JSONUtils.parseObject(taskJson,
TaskNode.class);
subProcessId =
Integer.parseInt(JSONUtils.parseObject(
-
taskNode.getParams()).path(CMDPARAM_SUB_PROCESS_DEFINE_ID).asText());
+
taskNode.getParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
}
treeViewDto.getInstances().add(new
Instance(taskInstance.getId(), taskInstance.getName(),
taskInstance.getTaskType(), taskInstance.getState().toString()
, taskInstance.getStartTime(),
taskInstance.getEndTime(), taskInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - startTime.getTime()),
subProcessId));
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
index a4c0c6b..9f8d1b2 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.api.service;
import static org.mockito.ArgumentMatchers.any;
@@ -137,9 +138,31 @@ public class ExecutorService2Test {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
} catch (Exception e) {
+ //ignore
+ }
+ }
+
+ /**
+ * not complement
+ */
+ @Test
+ public void testComplementWithStartNodeList() throws ParseException {
+ try {
+
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Map<String, Object> result =
executorService.execProcessInstance(loginUser, projectName,
+ processDefinitionId, cronTime, CommandType.START_PROCESS,
+ null, "n1,n2",
+ null, null, 0,
+ "", "", RunMode.RUN_MODE_SERIAL,
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+ Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+ verify(processService, times(1)).createCommand(any(Command.class));
+ } catch (Exception e) {
+ //ignore
}
}
+
/**
* date error
*/
@@ -156,6 +179,7 @@ public class ExecutorService2Test {
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR,
result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
} catch (Exception e) {
+ //ignore
}
}
@@ -175,6 +199,7 @@ public class ExecutorService2Test {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
} catch (Exception e) {
+ //ignore
}
}
@@ -194,6 +219,7 @@ public class ExecutorService2Test {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService,
times(31)).createCommand(any(Command.class));
} catch (Exception e) {
+ //ignore
}
}
@@ -213,10 +239,10 @@ public class ExecutorService2Test {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService,
times(15)).createCommand(any(Command.class));
} catch (Exception e) {
+ //ignore
}
}
-
@Test
public void testNoMsterServers() throws ParseException {
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new
ArrayList<Server>());
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 55d7b09..e197e56 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.api.service;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.enums.Status;
@@ -66,6 +68,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -83,33 +88,6 @@ import
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@RunWith(MockitoJUnitRunner.class)
public class ProcessDefinitionServiceTest {
- @InjectMocks
- private ProcessDefinitionServiceImpl processDefinitionService;
-
- @Mock
- private ProcessDefinitionMapper processDefineMapper;
-
- @Mock
- private ProjectMapper projectMapper;
-
- @Mock
- private ProjectServiceImpl projectService;
-
- @Mock
- private ScheduleMapper scheduleMapper;
-
- @Mock
- private ProcessService processService;
-
- @Mock
- private ProcessInstanceService processInstanceService;
-
- @Mock
- private TaskInstanceMapper taskInstanceMapper;
-
- @Mock
- private ProcessDefinitionVersionService processDefinitionVersionService;
-
private static final String SHELL_JSON = "{\n"
+ " \"globalParams\": [\n"
+ " \n"
@@ -150,7 +128,6 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
-
private static final String CYCLE_SHELL_JSON = "{\n"
+ " \"globalParams\": [\n"
+ " \n"
@@ -253,6 +230,24 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
+ @InjectMocks
+ private ProcessDefinitionServiceImpl processDefinitionService;
+ @Mock
+ private ProcessDefinitionMapper processDefineMapper;
+ @Mock
+ private ProjectMapper projectMapper;
+ @Mock
+ private ProjectServiceImpl projectService;
+ @Mock
+ private ScheduleMapper scheduleMapper;
+ @Mock
+ private ProcessService processService;
+ @Mock
+ private ProcessInstanceService processInstanceService;
+ @Mock
+ private TaskInstanceMapper taskInstanceMapper;
+ @Mock
+ private ProcessDefinitionVersionService processDefinitionVersionService;
@Test
public void testQueryProcessDefinitionList() {
@@ -751,6 +746,70 @@ public class ProcessDefinitionServiceTest {
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(),
"shell-1")).thenReturn(taskInstance);
Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS,
taskNotNuLLRes.get(Constants.STATUS));
+
+ }
+
+ @Test
+ public void testSubProcessViewTree() throws Exception {
+
+ ProcessDefinition processDefinition = getProcessDefinition();
+ processDefinition.setProcessDefinitionJson(SHELL_JSON);
+ List<ProcessInstance> processInstanceList = new ArrayList<>();
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setName("test_instance");
+ processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ processInstance.setHost("192.168.xx.xx");
+ processInstance.setStartTime(new Date());
+ processInstance.setEndTime(new Date());
+ processInstanceList.add(processInstance);
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setStartTime(new Date());
+ taskInstance.setEndTime(new Date());
+ taskInstance.setTaskType("SUB_PROCESS");
+ taskInstance.setId(1);
+ taskInstance.setName("test_task_instance");
+ taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ taskInstance.setHost("192.168.xx.xx");
+ taskInstance.setTaskJson("{\n"
+ + " \"conditionResult\": {\n"
+ + " \"failedNode\": [\n"
+ + " \"\"\n"
+ + " ],\n"
+ + " \"successNode\": [\n"
+ + " \"\"\n"
+ + " ]\n"
+ + " },\n"
+ + " \"delayTime\": \"0\",\n"
+ + " \"dependence\": {},\n"
+ + " \"description\": \"\",\n"
+ + " \"id\": \"1\",\n"
+ + " \"maxRetryTimes\": \"0\",\n"
+ + " \"name\": \"test_task_instance\",\n"
+ + " \"params\": {\n"
+ + " \"processDefinitionId\": \"222\",\n"
+ + " \"resourceList\": []\n"
+ + " },\n"
+ + " \"preTasks\": [],\n"
+ + " \"retryInterval\": \"1\",\n"
+ + " \"runFlag\": \"NORMAL\",\n"
+ + " \"taskInstancePriority\": \"MEDIUM\",\n"
+ + " \"timeout\": {\n"
+ + " \"enable\": false,\n"
+ + " \"interval\": null,\n"
+ + " \"strategy\": \"\"\n"
+ + " },\n"
+ + " \"type\": \"SUB_PROCESS\",\n"
+ + " \"workerGroup\": \"default\"\n"
+ + "}");
+ //task instance exist
+
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+ Mockito.when(processInstanceService.queryByProcessDefineId(46,
10)).thenReturn(processInstanceList);
+
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(),
"shell-1")).thenReturn(taskInstance);
+ Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(46, 10);
+ Assert.assertEquals(Status.SUCCESS,
taskNotNuLLRes.get(Constants.STATUS));
+
}
@Test
@@ -973,7 +1032,7 @@ public class ProcessDefinitionServiceTest {
}
@Test
- public void testBatchExportProcessDefinitionByIds() {
+ public void testBatchExportProcessDefinitionByIds() throws IOException {
processDefinitionService.batchExportProcessDefinitionByIds(
null, null, null, null);
@@ -991,6 +1050,28 @@ public class ProcessDefinitionServiceTest {
processDefinitionService.batchExportProcessDefinitionByIds(
loginUser, projectName, "1", null);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(1);
+
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ +
"{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ +
",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo
\\\"123123\\\"\",\"resourceList\":[]}"
+ +
",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ +
",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ +
",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+ Map<String, Object> checkResult = new HashMap<>();
+ checkResult.put(Constants.STATUS, Status.SUCCESS);
+
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectName)).thenReturn(checkResult);
+
Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition);
+ HttpServletResponse response = mock(HttpServletResponse.class);
+
+ ServletOutputStream outputStream = mock(ServletOutputStream.class);
+ when(response.getOutputStream()).thenReturn(outputStream);
+ processDefinitionService.batchExportProcessDefinitionByIds(
+ loginUser, projectName, "1", response);
+
}
@Test
@@ -1182,4 +1263,35 @@ public class ProcessDefinitionServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
+
+ @Test
+ public void testExportProcessMetaData() {
+ Integer processDefinitionId = 111;
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(processDefinitionId);
+
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ + "{\"failedNode\":[\"\"],\"successNode\":"
+ + "[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ +
"\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\","
+ + "\"params\":{\"localParams\":[],\"rawScript\":\"echo
\\\"123123\\\"\",\"resourceList\":[]},"
+ +
"\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ +
"\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\","
+ +
"\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinitionId,
processDefinition));
+ }
+
+ @Test
+ public void testImportProcessSchedule() {
+ User loginUser = new User();
+ loginUser.setId(1);
+ loginUser.setUserType(UserType.ADMIN_USER);
+ Integer processDefinitionId = 111;
+ String processDefinitionName = "testProcessDefinition";
+ String projectName = "project_test1";
+ Map<String, Object> result = new HashMap<>();
+ putMsg(result, Status.PROJECT_NOT_FOUNT);
+ ProcessMeta processMeta = new ProcessMeta();
+ Assert.assertEquals(0,
processDefinitionService.importProcessSchedule(loginUser, projectName,
processMeta, processDefinitionName, processDefinitionId));
+ }
+
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 4a696d2..e473428 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -442,21 +442,21 @@ public final class Constants {
/**
* command parameter keys
*/
- public static final String CMDPARAM_RECOVER_PROCESS_ID_STRING =
"ProcessInstanceId";
+ public static final String CMD_PARAM_RECOVER_PROCESS_ID_STRING =
"ProcessInstanceId";
- public static final String CMDPARAM_RECOVERY_START_NODE_STRING =
"StartNodeIdList";
+ public static final String CMD_PARAM_RECOVERY_START_NODE_STRING =
"StartNodeIdList";
- public static final String CMDPARAM_RECOVERY_WAITTING_THREAD =
"WaittingThreadInstanceId";
+ public static final String CMD_PARAM_RECOVERY_WAITING_THREAD =
"WaitingThreadInstanceId";
- public static final String CMDPARAM_SUB_PROCESS = "processInstanceId";
+ public static final String CMD_PARAM_SUB_PROCESS = "processInstanceId";
- public static final String CMDPARAM_EMPTY_SUB_PROCESS = "0";
+ public static final String CMD_PARAM_EMPTY_SUB_PROCESS = "0";
- public static final String CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID =
"parentProcessInstanceId";
+ public static final String CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID =
"parentProcessInstanceId";
- public static final String CMDPARAM_SUB_PROCESS_DEFINE_ID =
"processDefinitionId";
+ public static final String CMD_PARAM_SUB_PROCESS_DEFINE_ID =
"processDefinitionId";
- public static final String CMDPARAM_START_NODE_NAMES = "StartNodeNameList";
+ public static final String CMD_PARAM_START_NODE_NAMES =
"StartNodeNameList";
/**
* complement data start date
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 2bc8031..66de086 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master.runner;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVERY_START_NODE_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_START_NODE_NAMES;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
@@ -35,7 +36,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
-import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@@ -87,22 +87,18 @@ public class MasterExecThread implements Runnable {
* logger of MasterExecThread
*/
private static final Logger logger =
LoggerFactory.getLogger(MasterExecThread.class);
-
- /**
- * process instance
- */
- private ProcessInstance processInstance;
-
/**
- * runing TaskNode
+ * runing TaskNode
*/
- private final Map<MasterBaseTaskExecThread,Future<Boolean>> activeTaskNode
= new ConcurrentHashMap<>();
-
+ private final Map<MasterBaseTaskExecThread, Future<Boolean>>
activeTaskNode = new ConcurrentHashMap<>();
/**
* task exec service
*/
private final ExecutorService taskExecService;
-
+ /**
+ * process instance
+ */
+ private ProcessInstance processInstance;
/**
* submit failure nodes
*/
@@ -116,7 +112,7 @@ public class MasterExecThread implements Runnable {
/**
* error task list
*/
- private Map<String,TaskInstance> errorTaskList = new ConcurrentHashMap<>();
+ private Map<String, TaskInstance> errorTaskList = new
ConcurrentHashMap<>();
/**
* complete task list
@@ -159,7 +155,7 @@ public class MasterExecThread implements Runnable {
private DAG<String, TaskNode, TaskNodeRelation> dag;
/**
- * process service
+ * process service
*/
private ProcessService processService;
@@ -172,9 +168,16 @@ public class MasterExecThread implements Runnable {
*
*/
private NettyRemotingClient nettyRemotingClient;
+ /**
+ * submit post node
+ *
+ * @param parentNodeName parent node name
+ */
+ private Map<String, Object> propToValue = new ConcurrentHashMap<String,
Object>();
/**
* constructor of MasterExecThread
+ *
* @param processInstance processInstance
* @param processService processService
* @param nettyRemotingClient nettyRemotingClient
@@ -195,39 +198,36 @@ public class MasterExecThread implements Runnable {
this.alertManager = alertManager;
}
-
-
-
@Override
public void run() {
// process instance is null
- if (processInstance == null){
+ if (processInstance == null) {
logger.info("process instance is not exists");
return;
}
// check to see if it's done
- if (processInstance.getState().typeIsFinished()){
- logger.info("process instance is done :
{}",processInstance.getId());
+ if (processInstance.getState().typeIsFinished()) {
+ logger.info("process instance is done : {}",
processInstance.getId());
return;
}
try {
- if (processInstance.isComplementData() && Flag.NO ==
processInstance.getIsSubProcess()){
+ if (processInstance.isComplementData() && Flag.NO ==
processInstance.getIsSubProcess()) {
// sub process complement data
executeComplementProcess();
- }else{
+ } else {
// execute flow
executeProcess();
}
- }catch (Exception e){
+ } catch (Exception e) {
logger.error("master exec thread exception", e);
logger.error("process execute failed, process id:{}",
processInstance.getId());
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setEndTime(new Date());
processService.updateProcessInstance(processInstance);
- }finally {
+ } finally {
taskExecService.shutdown();
// post handle
postHandle();
@@ -236,6 +236,7 @@ public class MasterExecThread implements Runnable {
/**
* execute process
+ *
* @throws Exception exception
*/
private void executeProcess() throws Exception {
@@ -246,6 +247,7 @@ public class MasterExecThread implements Runnable {
/**
* execute complement process
+ *
* @throws Exception exception
*/
private void executeComplementProcess() throws Exception {
@@ -260,7 +262,7 @@ public class MasterExecThread implements Runnable {
int processDefinitionId = processInstance.getProcessDefinitionId();
List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
List<Date> listDate = Lists.newLinkedList();
- if(!CollectionUtils.isEmpty(schedules)){
+ if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule schedule : schedules) {
listDate.addAll(CronUtils.getSelfFireDateList(startDate,
endDate, schedule.getCrontab()));
}
@@ -268,26 +270,26 @@ public class MasterExecThread implements Runnable {
// get first fire date
Iterator<Date> iterator = null;
Date scheduleDate = null;
- if(!CollectionUtils.isEmpty(listDate)) {
+ if (!CollectionUtils.isEmpty(listDate)) {
iterator = listDate.iterator();
scheduleDate = iterator.next();
processInstance.setScheduleTime(scheduleDate);
processService.updateProcessInstance(processInstance);
- }else{
+ } else {
scheduleDate = processInstance.getScheduleTime();
- if(scheduleDate == null){
+ if (scheduleDate == null) {
scheduleDate = startDate;
}
}
- while(Stopper.isRunning()){
+ while (Stopper.isRunning()) {
logger.info("process {} start to complement {} data",
processInstance.getId(),
DateUtils.dateToString(scheduleDate));
// prepare dag and other info
prepareProcess();
- if(dag == null){
+ if (dag == null) {
logger.error("process {} dag is null, please check out
parameters",
processInstance.getId());
processInstance.setState(ExecutionStatus.SUCCESS);
@@ -300,23 +302,23 @@ public class MasterExecThread implements Runnable {
endProcess();
// process instance failure ,no more complements
- if(!processInstance.getState().typeIsSuccess()){
+ if (!processInstance.getState().typeIsSuccess()) {
logger.info("process {} state {}, complement not completely!",
processInstance.getId(), processInstance.getState());
break;
}
// current process instance success ,next execute
- if(null == iterator){
+ if (null == iterator) {
// loop by day
scheduleDate = DateUtils.getSomeDay(scheduleDate, 1);
- if(scheduleDate.after(endDate)){
+ if (scheduleDate.after(endDate)) {
// all success
logger.info("process {} complement completely!",
processInstance.getId());
break;
}
- }else{
+ } else {
// loop by schedule date
- if(!iterator.hasNext()){
+ if (!iterator.hasNext()) {
// all success
logger.info("process {} complement completely!",
processInstance.getId());
break;
@@ -326,8 +328,8 @@ public class MasterExecThread implements Runnable {
// flow end
// execute next process instance complement data
processInstance.setScheduleTime(scheduleDate);
-
if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
- cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+ if
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
+
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
}
@@ -343,9 +345,9 @@ public class MasterExecThread implements Runnable {
}
}
-
/**
* prepare process parameter
+ *
* @throws Exception exception
*/
private void prepareProcess() throws Exception {
@@ -358,23 +360,22 @@ public class MasterExecThread implements Runnable {
logger.info("prepare process :{} end", processInstance.getId());
}
-
/**
* process end handle
*/
private void endProcess() {
processInstance.setEndTime(new Date());
processService.updateProcessInstance(processInstance);
- if(processInstance.getState().typeIsWaitingThread()){
+ if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null,
processInstance);
}
List<TaskInstance> taskInstances =
processService.findValidTaskListByProcessId(processInstance.getId());
alertManager.sendAlertProcessInstance(processInstance, taskInstances);
}
-
/**
- * generate process dag
+ * generate process dag
+ *
* @throws Exception exception
*/
private void buildFlowDag() throws Exception {
@@ -386,7 +387,7 @@ public class MasterExecThread implements Runnable {
List<String> startNodeNameList =
parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag =
generateFlowDag(processInstance.getProcessInstanceJson(),
startNodeNameList, recoveryNameList,
processInstance.getTaskDependType());
- if(processDag == null){
+ if (processDag == null) {
logger.error("processDag is null");
return;
}
@@ -397,7 +398,7 @@ public class MasterExecThread implements Runnable {
/**
* init task queue
*/
- private void initTaskQueue(){
+ private void initTaskQueue() {
taskFailedSubmit = false;
activeTaskNode.clear();
@@ -405,14 +406,14 @@ public class MasterExecThread implements Runnable {
completeTaskList.clear();
errorTaskList.clear();
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
- for(TaskInstance task : taskInstanceList){
- if(task.isTaskComplete()){
+ for (TaskInstance task : taskInstanceList) {
+ if (task.isTaskComplete()) {
completeTaskList.put(task.getName(), task);
}
- if(task.isConditionsTask() ||
DagHelper.haveConditionsAfterNode(task.getName(), dag)){
+ if (task.isConditionsTask() ||
DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
continue;
}
- if(task.getState().typeIsFailure() && !task.taskCanRetry()){
+ if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
errorTaskList.put(task.getName(), task);
}
}
@@ -441,18 +442,19 @@ public class MasterExecThread implements Runnable {
/**
* submit task to execute
+ *
* @param taskInstance task instance
* @return TaskInstance
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
MasterBaseTaskExecThread abstractExecThread = null;
- if(taskInstance.isSubProcess()){
+ if (taskInstance.isSubProcess()) {
abstractExecThread = new SubProcessTaskExecThread(taskInstance);
- }else if(taskInstance.isDependTask()){
+ } else if (taskInstance.isDependTask()) {
abstractExecThread = new DependentTaskExecThread(taskInstance);
- }else if(taskInstance.isConditionsTask()){
+ } else if (taskInstance.isConditionsTask()) {
abstractExecThread = new ConditionsTaskExecThread(taskInstance);
- }else {
+ } else {
abstractExecThread = new MasterTaskExecThread(taskInstance);
}
Future<Boolean> future = taskExecService.submit(abstractExecThread);
@@ -463,13 +465,14 @@ public class MasterExecThread implements Runnable {
/**
* find task instance in db.
* in case submit more than one same name task in the same time.
+ *
* @param taskName task name
* @return TaskInstance
*/
- private TaskInstance findTaskIfExists(String taskName){
+ private TaskInstance findTaskIfExists(String taskName) {
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(this.processInstance.getId());
- for(TaskInstance taskInstance : taskInstanceList){
- if(taskInstance.getName().equals(taskName)){
+ for (TaskInstance taskInstance : taskInstanceList) {
+ if (taskInstance.getName().equals(taskName)) {
return taskInstance;
}
}
@@ -478,15 +481,16 @@ public class MasterExecThread implements Runnable {
/**
* encapsulation task
- * @param processInstance process instance
- * @param nodeName node name
+ *
+ * @param processInstance process instance
+ * @param nodeName node name
* @return TaskInstance
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance,
String nodeName,
TaskNode taskNode) {
TaskInstance taskInstance = findTaskIfExists(nodeName);
- if(taskInstance == null){
+ if (taskInstance == null) {
taskInstance = new TaskInstance();
// task name
taskInstance.setName(nodeName);
@@ -519,9 +523,9 @@ public class MasterExecThread implements Runnable {
taskInstance.setRetryInterval(taskNode.getRetryInterval());
// task instance priority
- if(taskNode.getTaskInstancePriority() == null){
+ if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
- }else{
+ } else {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
}
@@ -530,7 +534,7 @@ public class MasterExecThread implements Runnable {
String taskWorkerGroup =
StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup :
taskNode.getWorkerGroup();
if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) &&
taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
taskInstance.setWorkerGroup(processWorkerGroup);
- }else {
+ } else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
@@ -540,15 +544,10 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
- /**
- * submit post node
- * @param parentNodeName parent node name
- */
- private Map<String,Object> propToValue = new ConcurrentHashMap<String,
Object>();
- private void submitPostNode(String parentNodeName){
+ private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList =
DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag,
completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
- for(String taskNode : submitTaskNodeList){
+ for (String taskNode : submitTaskNodeList) {
try {
VarPoolUtils.convertVarPoolToMap(propToValue,
processInstance.getVarPool());
} catch (ParseException e) {
@@ -558,23 +557,23 @@ public class MasterExecThread implements Runnable {
TaskNode taskNodeObject = dag.getNode(taskNode);
VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
taskInstances.add(createTaskInstance(processInstance, taskNode,
- taskNodeObject));
+ taskNodeObject));
}
// if previous node success , post node submit
- for(TaskInstance task : taskInstances){
+ for (TaskInstance task : taskInstances) {
- if(readyToSubmitTaskList.containsKey(task.getName())){
+ if (readyToSubmitTaskList.containsKey(task.getName())) {
continue;
}
- if(completeTaskList.containsKey(task.getName())){
+ if (completeTaskList.containsKey(task.getName())) {
logger.info("task {} has already run success", task.getName());
continue;
}
- if(task.getState().typeIsPause() ||
task.getState().typeIsCancel()){
+ if (task.getState().typeIsPause() ||
task.getState().typeIsCancel()) {
logger.info("task {} stopped, the state is {}",
task.getName(), task.getState());
- }else{
+ } else {
addTaskToStandByList(task);
}
}
@@ -582,36 +581,37 @@ public class MasterExecThread implements Runnable {
/**
* determine whether the dependencies of the task node are complete
+ *
* @return DependResult
*/
private DependResult isTaskDepsComplete(String taskName) {
Collection<String> startNodes = dag.getBeginNode();
// if vertex,returns true directly
- if(startNodes.contains(taskName)){
+ if (startNodes.contains(taskName)) {
return DependResult.SUCCESS;
}
TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList();
- for(String depsNode : depNameList ){
- if(!dag.containsNode(depsNode)
+ for (String depsNode : depNameList) {
+ if (!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
- || skipTaskNodeList.containsKey(depsNode)){
+ || skipTaskNodeList.containsKey(depsNode)) {
continue;
}
// dependencies must be fully completed
- if(!completeTaskList.containsKey(depsNode)){
+ if (!completeTaskList.containsKey(depsNode)) {
return DependResult.WAITING;
}
ExecutionStatus depTaskState =
completeTaskList.get(depsNode).getState();
- if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
+ if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.WAITING;
}
// ignore task state if current task is condition
- if(taskNode.isConditionsTask()){
+ if (taskNode.isConditionsTask()) {
continue;
}
- if(!dependTaskSuccess(depsNode, taskName)){
+ if (!dependTaskSuccess(depsNode, taskName)) {
return DependResult.FAILED;
}
}
@@ -621,20 +621,17 @@ public class MasterExecThread implements Runnable {
/**
* depend node is completed, but here need check the condition task branch
is the next node
- * @param dependNodeName
- * @param nextNodeName
- * @return
*/
- private boolean dependTaskSuccess(String dependNodeName, String
nextNodeName){
- if(dag.getNode(dependNodeName).isConditionsTask()){
+ private boolean dependTaskSuccess(String dependNodeName, String
nextNodeName) {
+ if (dag.getNode(dependNodeName).isConditionsTask()) {
//condition task need check the branch to run
List<String> nextTaskList =
DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag,
completeTaskList);
- if(!nextTaskList.contains(nextNodeName)){
+ if (!nextTaskList.contains(nextNodeName)) {
return false;
}
- }else {
+ } else {
ExecutionStatus depTaskState =
completeTaskList.get(dependNodeName).getState();
- if(depTaskState.typeIsFailure()){
+ if (depTaskState.typeIsFailure()) {
return false;
}
}
@@ -643,13 +640,14 @@ public class MasterExecThread implements Runnable {
/**
* query task instance by complete state
+ *
* @param state state
* @return task instance list
*/
- private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state){
+ private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state) {
List<TaskInstance> resultList = new ArrayList<>();
- for (Map.Entry<String, TaskInstance> entry:
completeTaskList.entrySet()) {
- if(entry.getValue().getState() == state){
+ for (Map.Entry<String, TaskInstance> entry :
completeTaskList.entrySet()) {
+ if (entry.getValue().getState() == state) {
resultList.add(entry.getValue());
}
}
@@ -657,18 +655,19 @@ public class MasterExecThread implements Runnable {
}
/**
- * where there are ongoing tasks
+ * where there are ongoing tasks
+ *
* @param state state
* @return ExecutionStatus
*/
- private ExecutionStatus runningState(ExecutionStatus state){
+ private ExecutionStatus runningState(ExecutionStatus state) {
if (state == ExecutionStatus.READY_STOP
|| state == ExecutionStatus.READY_PAUSE
|| state == ExecutionStatus.WAITTING_THREAD
|| state == ExecutionStatus.DELAY_EXECUTION) {
// if the running task is not completed, the state remains
unchanged
return state;
- }else{
+ } else {
return ExecutionStatus.RUNNING_EXECUTION;
}
}
@@ -678,12 +677,12 @@ public class MasterExecThread implements Runnable {
*
* @return Boolean whether has failed task
*/
- private boolean hasFailedTask(){
+ private boolean hasFailedTask() {
- if(this.taskFailedSubmit){
+ if (this.taskFailedSubmit) {
return true;
}
- if(this.errorTaskList.size() > 0){
+ if (this.errorTaskList.size() > 0) {
return true;
}
return this.dependFailedTask.size() > 0;
@@ -694,9 +693,9 @@ public class MasterExecThread implements Runnable {
*
* @return Boolean whether process instance failed
*/
- private boolean processFailed(){
- if(hasFailedTask()) {
- if(processInstance.getFailureStrategy() == FailureStrategy.END){
+ private boolean processFailed() {
+ if (hasFailedTask()) {
+ if (processInstance.getFailureStrategy() == FailureStrategy.END) {
return true;
}
if (processInstance.getFailureStrategy() ==
FailureStrategy.CONTINUE) {
@@ -708,9 +707,10 @@ public class MasterExecThread implements Runnable {
/**
* whether task for waiting thread
+ *
* @return Boolean whether has waiting thread task
*/
- private boolean hasWaitingThreadTask(){
+ private boolean hasWaitingThreadTask() {
List<TaskInstance> waitingList =
getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD);
return CollectionUtils.isNotEmpty(waitingList);
}
@@ -720,74 +720,75 @@ public class MasterExecThread implements Runnable {
* 1,failed retry task in the preparation queue , returns to failure
directly
* 2,exists pause task,complement not completed, pending submission of
tasks, return to suspension
* 3,success
+ *
* @return ExecutionStatus
*/
- private ExecutionStatus processReadyPause(){
- if(hasRetryTaskInStandBy()){
+ private ExecutionStatus processReadyPause() {
+ if (hasRetryTaskInStandBy()) {
return ExecutionStatus.FAILURE;
}
List<TaskInstance> pauseList =
getCompleteTaskByState(ExecutionStatus.PAUSE);
- if(CollectionUtils.isNotEmpty(pauseList)
+ if (CollectionUtils.isNotEmpty(pauseList)
|| !isComplementEnd()
- || readyToSubmitTaskList.size() > 0){
+ || readyToSubmitTaskList.size() > 0) {
return ExecutionStatus.PAUSE;
- }else{
+ } else {
return ExecutionStatus.SUCCESS;
}
}
-
/**
* generate the latest process instance status by the tasks state
+ *
* @return process instance execution status
*/
- private ExecutionStatus getProcessInstanceState(){
+ private ExecutionStatus getProcessInstanceState() {
ProcessInstance instance =
processService.findProcessInstanceById(processInstance.getId());
ExecutionStatus state = instance.getState();
- if(activeTaskNode.size() > 0 || hasRetryTaskInStandBy()){
+ if (activeTaskNode.size() > 0 || hasRetryTaskInStandBy()) {
// active task and retry task exists
return runningState(state);
}
// process failure
- if(processFailed()){
+ if (processFailed()) {
return ExecutionStatus.FAILURE;
}
// waiting thread
- if(hasWaitingThreadTask()){
+ if (hasWaitingThreadTask()) {
return ExecutionStatus.WAITTING_THREAD;
}
// pause
- if(state == ExecutionStatus.READY_PAUSE){
+ if (state == ExecutionStatus.READY_PAUSE) {
return processReadyPause();
}
// stop
- if(state == ExecutionStatus.READY_STOP){
+ if (state == ExecutionStatus.READY_STOP) {
List<TaskInstance> stopList =
getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList =
getCompleteTaskByState(ExecutionStatus.KILL);
- if(CollectionUtils.isNotEmpty(stopList)
+ if (CollectionUtils.isNotEmpty(stopList)
|| CollectionUtils.isNotEmpty(killList)
- || !isComplementEnd()){
+ || !isComplementEnd()) {
return ExecutionStatus.STOP;
- }else{
+ } else {
return ExecutionStatus.SUCCESS;
}
}
// success
- if(state == ExecutionStatus.RUNNING_EXECUTION){
+ if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks =
getCompleteTaskByState(ExecutionStatus.KILL);
- if(readyToSubmitTaskList.size() > 0){
+ if (readyToSubmitTaskList.size() > 0) {
//tasks currently pending submission, no retries, indicating
that depend is waiting to complete
return ExecutionStatus.RUNNING_EXECUTION;
- }else if(CollectionUtils.isNotEmpty(killTasks)){
+ } else if (CollectionUtils.isNotEmpty(killTasks)) {
// tasks maybe killed manually
return ExecutionStatus.FAILURE;
- }else{
+ } else {
// if the waiting queue is empty and the status is in
progress, then success
return ExecutionStatus.SUCCESS;
}
@@ -798,15 +799,14 @@ public class MasterExecThread implements Runnable {
/**
* whether standby task list have retry tasks
- * @return
*/
private boolean retryTaskExists() {
boolean result = false;
- for(String taskName : readyToSubmitTaskList.keySet()){
+ for (String taskName : readyToSubmitTaskList.keySet()) {
TaskInstance task = readyToSubmitTaskList.get(taskName);
- if(task.getState().typeIsFailure()){
+ if (task.getState().typeIsFailure()) {
result = true;
break;
}
@@ -816,10 +816,11 @@ public class MasterExecThread implements Runnable {
/**
* whether complement end
+ *
* @return Boolean whether is complement end
*/
private boolean isComplementEnd() {
- if(!processInstance.isComplementData()){
+ if (!processInstance.isComplementData()) {
return true;
}
@@ -828,7 +829,7 @@ public class MasterExecThread implements Runnable {
Date endTime =
DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
return processInstance.getScheduleTime().equals(endTime);
} catch (Exception e) {
- logger.error("complement end failed ",e);
+ logger.error("complement end failed ", e);
return false;
}
}
@@ -839,7 +840,7 @@ public class MasterExecThread implements Runnable {
*/
private void updateProcessInstanceState() {
ExecutionStatus state = getProcessInstanceState();
- if(processInstance.getState() != state){
+ if (processInstance.getState() != state) {
logger.info(
"work flow process instance [id: {}, name:{}], state
change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(),
@@ -856,38 +857,42 @@ public class MasterExecThread implements Runnable {
/**
* get task dependency result
+ *
* @param taskInstance task instance
* @return DependResult
*/
- private DependResult getDependResultForTask(TaskInstance taskInstance){
+ private DependResult getDependResultForTask(TaskInstance taskInstance) {
return isTaskDepsComplete(taskInstance.getName());
}
/**
* add task to standby list
+ *
* @param taskInstance task instance
*/
- private void addTaskToStandByList(TaskInstance taskInstance){
+ private void addTaskToStandByList(TaskInstance taskInstance) {
logger.info("add task to stand by list: {}", taskInstance.getName());
readyToSubmitTaskList.putIfAbsent(taskInstance.getName(),
taskInstance);
}
/**
* remove task from stand by list
+ *
* @param taskInstance task instance
*/
- private void removeTaskFromStandbyList(TaskInstance taskInstance){
+ private void removeTaskFromStandbyList(TaskInstance taskInstance) {
logger.info("remove task from stand by list: {}",
taskInstance.getName());
readyToSubmitTaskList.remove(taskInstance.getName());
}
/**
* has retry task in standby
+ *
* @return Boolean whether has retry task in standby
*/
- private boolean hasRetryTaskInStandBy(){
- for (Map.Entry<String, TaskInstance> entry:
readyToSubmitTaskList.entrySet()) {
- if(entry.getValue().getState().typeIsFailure()){
+ private boolean hasRetryTaskInStandBy() {
+ for (Map.Entry<String, TaskInstance> entry :
readyToSubmitTaskList.entrySet()) {
+ if (entry.getValue().getState().typeIsFailure()) {
return true;
}
}
@@ -897,44 +902,44 @@ public class MasterExecThread implements Runnable {
/**
* submit and watch the tasks, until the work flow stop
*/
- private void runProcess(){
+ private void runProcess() {
// submit start node
submitPostNode(null);
boolean sendTimeWarning = false;
- while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){
+ while (!processInstance.isProcessInstanceStop() &&
Stopper.isRunning()) {
// send warning email if process time out.
- if(!sendTimeWarning && checkProcessTimeOut(processInstance) ){
+ if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
alertManager.sendProcessTimeoutAlert(processInstance,
processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
sendTimeWarning = true;
}
- for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry:
activeTaskNode.entrySet()) {
+ for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry :
activeTaskNode.entrySet()) {
Future<Boolean> future = entry.getValue();
- TaskInstance task = entry.getKey().getTaskInstance();
+ TaskInstance task = entry.getKey().getTaskInstance();
- if(!future.isDone()){
+ if (!future.isDone()) {
continue;
}
// node monitor thread complete
task = this.processService.findTaskInstanceById(task.getId());
- if(task == null){
+ if (task == null) {
this.taskFailedSubmit = true;
activeTaskNode.remove(entry.getKey());
continue;
}
// node monitor thread complete
- if(task.getState().typeIsFinished()){
+ if (task.getState().typeIsFinished()) {
activeTaskNode.remove(entry.getKey());
}
logger.info("task :{}, id:{} complete, state is {} ",
task.getName(), task.getId(), task.getState());
// node success , post node submit
- if(task.getState() == ExecutionStatus.SUCCESS){
+ if (task.getState() == ExecutionStatus.SUCCESS) {
processInstance.setVarPool(task.getVarPool());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);
@@ -942,20 +947,20 @@ public class MasterExecThread implements Runnable {
continue;
}
// node fails, retry first, and then execute the failure
process
- if(task.getState().typeIsFailure()){
- if(task.getState() ==
ExecutionStatus.NEED_FAULT_TOLERANCE){
+ if (task.getState().typeIsFailure()) {
+ if (task.getState() ==
ExecutionStatus.NEED_FAULT_TOLERANCE) {
this.recoverToleranceFaultTaskList.add(task);
}
- if(task.taskCanRetry()){
+ if (task.taskCanRetry()) {
addTaskToStandByList(task);
- }else{
+ } else {
completeTaskList.put(task.getName(), task);
- if( task.isConditionsTask()
- ||
DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
+ if (task.isConditionsTask()
+ ||
DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
submitPostNode(task.getName());
- }else{
+ } else {
errorTaskList.put(task.getName(), task);
- if(processInstance.getFailureStrategy() ==
FailureStrategy.END){
+ if (processInstance.getFailureStrategy() ==
FailureStrategy.END) {
killTheOtherTasks();
}
}
@@ -966,30 +971,30 @@ public class MasterExecThread implements Runnable {
completeTaskList.put(task.getName(), task);
}
// send alert
- if(CollectionUtils.isNotEmpty(this.recoverToleranceFaultTaskList)){
+ if
(CollectionUtils.isNotEmpty(this.recoverToleranceFaultTaskList)) {
alertManager.sendAlertWorkerToleranceFault(processInstance,
recoverToleranceFaultTaskList);
this.recoverToleranceFaultTaskList.clear();
}
// updateProcessInstance completed task status
// failure priority is higher than pause
// if a task fails, other suspended tasks need to be reset kill
- if(errorTaskList.size() > 0){
- for(Map.Entry<String, TaskInstance> entry:
completeTaskList.entrySet()) {
+ if (errorTaskList.size() > 0) {
+ for (Map.Entry<String, TaskInstance> entry :
completeTaskList.entrySet()) {
TaskInstance completeTask = entry.getValue();
- if(completeTask.getState()== ExecutionStatus.PAUSE){
+ if (completeTask.getState() == ExecutionStatus.PAUSE) {
completeTask.setState(ExecutionStatus.KILL);
completeTaskList.put(entry.getKey(), completeTask);
processService.updateTaskInstance(completeTask);
}
}
}
- if(canSubmitTaskToQueue()){
+ if (canSubmitTaskToQueue()) {
submitStandByTask();
}
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {
- logger.error(e.getMessage(),e);
+ logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
updateProcessInstanceState();
@@ -1000,29 +1005,30 @@ public class MasterExecThread implements Runnable {
/**
* whether check process time out
+ *
* @param processInstance task instance
* @return true if time out of process instance > running time of process
instance
*/
private boolean checkProcessTimeOut(ProcessInstance processInstance) {
- if(processInstance.getTimeout() == 0 ){
+ if (processInstance.getTimeout() == 0) {
return false;
}
Date now = new Date();
- long runningTime = DateUtils.diffMin(now,
processInstance.getStartTime());
+ long runningTime = DateUtils.diffMin(now,
processInstance.getStartTime());
return runningTime > processInstance.getTimeout();
}
/**
* whether can submit task to queue
+ *
* @return boolean
*/
private boolean canSubmitTaskToQueue() {
return OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory());
}
-
/**
* close the on going tasks
*/
@@ -1036,7 +1042,7 @@ public class MasterExecThread implements Runnable {
TaskInstance taskInstance = taskExecThread.getTaskInstance();
taskInstance =
processService.findTaskInstanceById(taskInstance.getId());
- if(taskInstance != null &&
taskInstance.getState().typeIsFinished()){
+ if (taskInstance != null &&
taskInstance.getState().typeIsFinished()) {
continue;
}
@@ -1052,16 +1058,19 @@ public class MasterExecThread implements Runnable {
/**
* whether the retry interval is timed out
+ *
* @param taskInstance task instance
* @return Boolean
*/
- private boolean retryTaskIntervalOverTime(TaskInstance taskInstance){
- if(taskInstance.getState() != ExecutionStatus.FAILURE){
+ private boolean retryTaskIntervalOverTime(TaskInstance taskInstance) {
+ if (taskInstance.getState() != ExecutionStatus.FAILURE) {
return true;
}
- if(taskInstance.getId() == 0 ||
- taskInstance.getMaxRetryTimes() ==0 ||
- taskInstance.getRetryInterval() == 0 ){
+ if (taskInstance.getId() == 0
+ ||
+ taskInstance.getMaxRetryTimes() == 0
+ ||
+ taskInstance.getRetryInterval() == 0) {
return true;
}
Date now = new Date();
@@ -1073,62 +1082,64 @@ public class MasterExecThread implements Runnable {
/**
* handling the list of tasks to be submitted
*/
- private void submitStandByTask(){
- for(Map.Entry<String, TaskInstance> entry:
readyToSubmitTaskList.entrySet()) {
+ private void submitStandByTask() {
+ for (Map.Entry<String, TaskInstance> entry :
readyToSubmitTaskList.entrySet()) {
TaskInstance task = entry.getValue();
DependResult dependResult = getDependResultForTask(task);
- if(DependResult.SUCCESS == dependResult){
- if(retryTaskIntervalOverTime(task)){
+ if (DependResult.SUCCESS == dependResult) {
+ if (retryTaskIntervalOverTime(task)) {
submitTaskExec(task);
removeTaskFromStandbyList(task);
}
- }else if(DependResult.FAILED == dependResult){
+ } else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted
and the state changes to failure.
dependFailedTask.put(entry.getKey(), task);
removeTaskFromStandbyList(task);
- logger.info("task {},id:{} depend result : {}",task.getName(),
task.getId(), dependResult);
+ logger.info("task {},id:{} depend result : {}",
task.getName(), task.getId(), dependResult);
}
}
}
/**
* get recovery task instance
+ *
* @param taskId task id
* @return recovery task instance
*/
- private TaskInstance getRecoveryTaskInstance(String taskId){
- if(!StringUtils.isNotEmpty(taskId)){
+ private TaskInstance getRecoveryTaskInstance(String taskId) {
+ if (!StringUtils.isNotEmpty(taskId)) {
return null;
}
try {
Integer intId = Integer.valueOf(taskId);
TaskInstance task = processService.findTaskInstanceById(intId);
- if(task == null){
- logger.error("start node id cannot be found: {}", taskId);
- }else {
+ if (task == null) {
+ logger.error("start node id cannot be found: {}", taskId);
+ } else {
return task;
}
- }catch (Exception e){
- logger.error("get recovery task instance failed ",e);
+ } catch (Exception e) {
+ logger.error("get recovery task instance failed ", e);
}
return null;
}
/**
* get start task instance list
+ *
* @param cmdParam command param
* @return task instance list
*/
- private List<TaskInstance> getStartTaskInstanceList(String cmdParam){
+ private List<TaskInstance> getStartTaskInstanceList(String cmdParam) {
List<TaskInstance> instanceList = new ArrayList<>();
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
- if(paramMap != null &&
paramMap.containsKey(CMDPARAM_RECOVERY_START_NODE_STRING)){
- String[] idList =
paramMap.get(CMDPARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
- for(String nodeId : idList){
+ if (paramMap != null &&
paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
+ String[] idList =
paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
+ for (String nodeId : idList) {
TaskInstance task = getRecoveryTaskInstance(nodeId);
- if(task != null){
+ if (task != null) {
instanceList.add(task);
}
}
@@ -1138,17 +1149,18 @@ public class MasterExecThread implements Runnable {
/**
* parse "StartNodeNameList" from cmd param
+ *
* @param cmdParam command param
* @return start node name list
*/
- private List<String> parseStartNodeName(String cmdParam){
+ private List<String> parseStartNodeName(String cmdParam) {
List<String> startNodeNameList = new ArrayList<>();
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
- if(paramMap == null){
+ if (paramMap == null) {
return startNodeNameList;
}
- if(paramMap.containsKey(CMDPARAM_START_NODE_NAMES)){
- startNodeNameList =
Arrays.asList(paramMap.get(CMDPARAM_START_NODE_NAMES).split(Constants.COMMA));
+ if (paramMap.containsKey(CMD_PARAM_START_NODE_NAMES)) {
+ startNodeNameList =
Arrays.asList(paramMap.get(CMD_PARAM_START_NODE_NAMES).split(Constants.COMMA));
}
return startNodeNameList;
}
@@ -1156,11 +1168,12 @@ public class MasterExecThread implements Runnable {
/**
* generate start node name list from parsing command param;
* if "StartNodeIdList" exists in command param, return StartNodeIdList
+ *
* @return recovery node name list
*/
- private List<String> getRecoveryNodeNameList(){
+ private List<String> getRecoveryNodeNameList() {
List<String> recoveryNodeNameList = new ArrayList<>();
- if(CollectionUtils.isNotEmpty(recoverNodeIdList)) {
+ if (CollectionUtils.isNotEmpty(recoverNodeIdList)) {
for (TaskInstance task : recoverNodeIdList) {
recoveryNodeNameList.add(task.getName());
}
@@ -1170,17 +1183,18 @@ public class MasterExecThread implements Runnable {
/**
* generate flow dag
+ *
* @param processDefinitionJson process definition json
- * @param startNodeNameList start node name list
- * @param recoveryNodeNameList recovery node name list
- * @param depNodeType depend node type
+ * @param startNodeNameList start node name list
+ * @param recoveryNodeNameList recovery node name list
+ * @param depNodeType depend node type
* @return ProcessDag process dag
- * @throws Exception exception
+ * @throws Exception exception
*/
public ProcessDag generateFlowDag(String processDefinitionJson,
List<String> startNodeNameList,
List<String> recoveryNodeNameList,
- TaskDependType depNodeType)throws
Exception{
+ TaskDependType depNodeType) throws
Exception {
return DagHelper.generateFlowDag(processDefinitionJson,
startNodeNameList, recoveryNodeNameList, depNodeType);
}
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index bf1e7e2..6979a93 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -14,19 +14,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.common.enums.*;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterExecThread;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -36,15 +58,6 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ApplicationContext;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.util.*;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.mock;
/**
* test for MasterExecThread
@@ -66,7 +79,7 @@ public class MasterExecThreadTest {
private ApplicationContext applicationContext;
@Before
- public void init() throws Exception{
+ public void init() throws Exception {
processService = mock(ProcessService.class);
applicationContext = mock(ApplicationContext.class);
@@ -92,7 +105,7 @@ public class MasterExecThreadTest {
masterExecThread = PowerMockito.spy(new MasterExecThread(
processInstance
, processService
- ,null, null, config));
+ , null, null, config));
// prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true);
@@ -106,18 +119,17 @@ public class MasterExecThreadTest {
/**
* without schedule
- * @throws ParseException
*/
@Test
public void testParallelWithOutSchedule() throws ParseException {
- try{
+ try {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Method method =
MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 1-30 for next save, and last day 20 no save
verify(processService,
times(20)).saveProcessInstance(processInstance);
- }catch (Exception e){
+ } catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
@@ -125,27 +137,86 @@ public class MasterExecThreadTest {
/**
* with schedule
- * @throws ParseException
*/
@Test
- public void testParallelWithSchedule() throws ParseException {
- try{
+ public void testParallelWithSchedule() {
+ try {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Method method =
MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 9(1 to 20 step 2) for next save, and last
day 31 no save
verify(processService,
times(9)).saveProcessInstance(processInstance);
- }catch (Exception e){
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testParseStartNodeName() throws ParseException {
+ try {
+ Map<String, String> cmdParam = new HashMap<>();
+ cmdParam.put(CMD_PARAM_START_NODE_NAMES, "t1,t2,t3");
+
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam));
+ Class<MasterExecThread> masterExecThreadClass =
MasterExecThread.class;
+ Method method =
masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class);
+ method.setAccessible(true);
+ List<String> nodeNames = (List<String>)
method.invoke(masterExecThread, JSONUtils.toJsonString(cmdParam));
+ Assert.assertEquals(3, nodeNames.size());
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testRetryTaskIntervalOverTime() {
+ try {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(0);
+ taskInstance.setMaxRetryTimes(0);
+ taskInstance.setRetryInterval(0);
+ taskInstance.setState(ExecutionStatus.FAILURE);
+ Class<MasterExecThread> masterExecThreadClass =
MasterExecThread.class;
+ Method method =
masterExecThreadClass.getDeclaredMethod("retryTaskIntervalOverTime",
TaskInstance.class);
+ method.setAccessible(true);
+ Assert.assertTrue((Boolean) method.invoke(masterExecThread,
taskInstance));
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testGetStartTaskInstanceList() {
+ try {
+ TaskInstance taskInstance1 = new TaskInstance();
+ taskInstance1.setId(1);
+ TaskInstance taskInstance2 = new TaskInstance();
+ taskInstance2.setId(2);
+ TaskInstance taskInstance3 = new TaskInstance();
+ taskInstance3.setId(3);
+ TaskInstance taskInstance4 = new TaskInstance();
+ taskInstance4.setId(4);
+ Map<String, String> cmdParam = new HashMap<>();
+ cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
+
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance1);
+
Mockito.when(processService.findTaskInstanceById(2)).thenReturn(taskInstance2);
+
Mockito.when(processService.findTaskInstanceById(3)).thenReturn(taskInstance3);
+
Mockito.when(processService.findTaskInstanceById(4)).thenReturn(taskInstance4);
+ Class<MasterExecThread> masterExecThreadClass =
MasterExecThread.class;
+ Method method =
masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList",
String.class);
+ method.setAccessible(true);
+ List<TaskInstance> taskInstances = (List<TaskInstance>)
method.invoke(masterExecThread, JSONUtils.toJsonString(cmdParam));
+ Assert.assertEquals(4, taskInstances.size());
+ } catch (Exception e) {
Assert.fail();
}
}
- private List<Schedule> zeroSchedulerList(){
+ private List<Schedule> zeroSchedulerList() {
return Collections.EMPTY_LIST;
}
- private List<Schedule> oneSchedulerList(){
+ private List<Schedule> oneSchedulerList() {
List<Schedule> schedulerList = new LinkedList<>();
Schedule schedule = new Schedule();
schedule.setCrontab("0 0 0 1/2 * ?");
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 38f9573..6262985 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.service.process;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_EMPTY_SUB_PROCESS;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
-import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
@@ -266,14 +266,14 @@ public class ProcessService {
if (cmdTypeMap.containsKey(commandType)) {
ObjectNode cmdParamObj =
JSONUtils.parseObject(command.getCommandParam());
- int processInstanceId =
cmdParamObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt();
+ int processInstanceId =
cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
List<Command> commands = commandMapper.selectList(null);
// for all commands
for (Command tmpCommand : commands) {
if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
ObjectNode tempObj =
JSONUtils.parseObject(tmpCommand.getCommandParam());
- if (tempObj != null && processInstanceId ==
tempObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
+ if (tempObj != null && processInstanceId ==
tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
isNeedCreate = false;
break;
}
@@ -439,7 +439,7 @@ public class ProcessService {
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
ObjectNode parameterJson = JSONUtils.parseObject(parameter);
- if (parameterJson.get(CMDPARAM_SUB_PROCESS_DEFINE_ID) != null)
{
+ if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_ID) !=
null) {
SubProcessParameters subProcessParam =
JSONUtils.parseObject(parameter, SubProcessParameters.class);
ids.add(subProcessParam.getProcessDefinitionId());
recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids);
@@ -468,7 +468,7 @@ public class ProcessService {
return;
}
Map<String, String> cmdParam = new HashMap<>();
- cmdParam.put(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD,
String.valueOf(processInstance.getId()));
+ cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD,
String.valueOf(processInstance.getId()));
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
@@ -613,8 +613,8 @@ public class ProcessService {
private Boolean checkCmdParam(Command command, Map<String, String>
cmdParam) {
if (command.getTaskDependType() == TaskDependType.TASK_ONLY ||
command.getTaskDependType() == TaskDependType.TASK_PRE) {
if (cmdParam == null
- || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES)
- ||
cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()) {
+ || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
+ ||
cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
logger.error("command node depend type is {}, but start nodes
is null ", command.getTaskDependType());
return false;
}
@@ -647,20 +647,20 @@ public class ProcessService {
if (cmdParam != null) {
Integer processInstanceId = 0;
// recover from failure or pause tasks
- if
(cmdParam.containsKey(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING)) {
- String processId =
cmdParam.get(Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING);
+ if
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
+ String processId =
cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
processInstanceId = Integer.parseInt(processId);
if (processInstanceId == 0) {
logger.error("command parameter is error, [
ProcessInstanceId ] is 0");
return null;
}
- } else if (cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)) {
+ } else if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
// sub process map
- String pId = cmdParam.get(Constants.CMDPARAM_SUB_PROCESS);
+ String pId = cmdParam.get(Constants.CMD_PARAM_SUB_PROCESS);
processInstanceId = Integer.parseInt(pId);
- } else if
(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD)) {
+ } else if
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD)) {
// waiting thread command
- String pId =
cmdParam.get(Constants.CMDPARAM_RECOVERY_WAITTING_THREAD);
+ String pId =
cmdParam.get(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD);
processInstanceId = Integer.parseInt(pId);
}
if (processInstanceId == 0) {
@@ -681,7 +681,7 @@ public class ProcessService {
}
}
// reset command parameter if sub process
- if (cmdParam.containsKey(Constants.CMDPARAM_SUB_PROCESS)) {
+ if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
processInstance.setCommandParam(command.getCommandParam());
}
} else {
@@ -708,14 +708,14 @@ public class ProcessService {
List<Integer> failedList =
this.findTaskIdByInstanceState(processInstance.getId(),
ExecutionStatus.FAILURE);
List<Integer> toleranceList =
this.findTaskIdByInstanceState(processInstance.getId(),
ExecutionStatus.NEED_FAULT_TOLERANCE);
List<Integer> killedList =
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
- cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
failedList.addAll(killedList);
failedList.addAll(toleranceList);
for (Integer taskId : failedList) {
initTaskInstance(this.findTaskInstanceById(taskId));
}
- cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
+ cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA,
convertIntListToString(failedList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
@@ -726,7 +726,7 @@ public class ProcessService {
break;
case RECOVER_SUSPENDED_PROCESS:
// find pause tasks and init task's state
- cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList =
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList =
findTaskIdByInstanceState(processInstance.getId(),
ExecutionStatus.KILL);
@@ -735,7 +735,7 @@ public class ProcessService {
// initialize the pause state
initTaskInstance(this.findTaskInstanceById(taskId));
}
- cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
String.join(",", convertIntListToString(suspendedNodeList)));
+ cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(",", convertIntListToString(suspendedNodeList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@@ -755,8 +755,8 @@ public class ProcessService {
break;
case REPEAT_RUNNING:
// delete the recover task names from command parameter
- if
(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)) {
-
cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
+ if
(cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
+
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
}
// delete all the valid tasks when repeat running
@@ -835,16 +835,16 @@ public class ProcessService {
}
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
// write sub process id into cmd param.
- if (paramMap.containsKey(CMDPARAM_SUB_PROCESS)
- &&
CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))) {
- paramMap.remove(CMDPARAM_SUB_PROCESS);
- paramMap.put(CMDPARAM_SUB_PROCESS,
String.valueOf(subProcessInstance.getId()));
+ if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
+ &&
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+ paramMap.remove(CMD_PARAM_SUB_PROCESS);
+ paramMap.put(CMD_PARAM_SUB_PROCESS,
String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
subProcessInstance.setIsSubProcess(Flag.YES);
this.saveProcessInstance(subProcessInstance);
}
// copy parent instance user def params to sub process..
- String parentInstanceId =
paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
+ String parentInstanceId =
paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
if (StringUtils.isNotEmpty(parentInstanceId)) {
ProcessInstance parentInstance =
findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
if (parentInstance != null) {
@@ -1058,7 +1058,7 @@ public class ProcessService {
CommandType commandType = getSubCommandType(parentProcessInstance,
childInstance);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
TaskNode.class);
Map<String, String> subProcessParam =
JSONUtils.toMap(taskNode.getParams());
- Integer childDefineId =
Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
+ Integer childDefineId =
Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
String processParam = getSubWorkFlowParam(instanceMap,
parentProcessInstance);
return new Command(
@@ -1443,7 +1443,7 @@ public class ProcessService {
* @return create process instance result
*/
public int createWorkProcessInstanceMap(ProcessInstanceMap
processInstanceMap) {
- Integer count = 0;
+ int count = 0;
if (processInstanceMap != null) {
return processInstanceMapMapper.insert(processInstanceMap);
}
@@ -1647,7 +1647,7 @@ public class ProcessService {
//2 insert into recover command
Command cmd = new Command();
cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
- cmd.setCommandParam(String.format("{\"%s\":%d}",
Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
+ cmd.setCommandParam(String.format("{\"%s\":%d}",
Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
createCommand(cmd);
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 74b52bb..4ac91f0 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -17,30 +17,75 @@
package org.apache.dolphinscheduler.service.process;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
/**
* process service test
*/
+@RunWith(MockitoJUnitRunner.class)
public class ProcessServiceTest {
+ private static final Logger logger =
LoggerFactory.getLogger(CronUtilsTest.class);
+
+ @InjectMocks
+ private ProcessService processService;
+
+
+ @Mock
+ private CommandMapper commandMapper;
+
+
+ @Mock
+ private ErrorCommandMapper errorCommandMapper;
+
+ @Mock
+ private ProcessDefinitionMapper processDefineMapper;
+ @Mock
+ private ProcessInstanceMapper processInstanceMapper;
+ @Mock
+ private UserMapper userMapper;
+ @Mock
+ TaskInstanceMapper taskInstanceMapper;
+
@Test
public void testCreateSubCommand() {
ProcessService processService = new ProcessService();
@@ -89,7 +134,7 @@ public class ProcessServiceTest {
String endString = "2020-01-10 00:00:00";
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
- Map<String,String> complementMap = new HashMap<>();
+ Map<String, String> complementMap = new HashMap<>();
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE,
startString);
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE,
endString);
parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
@@ -113,4 +158,168 @@ public class ProcessServiceTest {
);
Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS,
command.getCommandType());
}
+
+ @Test
+ public void testVerifyIsNeedCreateCommand() {
+
+ List<Command> commands = new ArrayList<>();
+
+ Command command = new Command();
+ command.setCommandType(CommandType.REPEAT_RUNNING);
+ command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING +
"\":\"111\"}");
+ commands.add(command);
+ Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
+ Assert.assertFalse(processService.verifyIsNeedCreateCommand(command));
+
+ Command command1 = new Command();
+ command1.setCommandType(CommandType.REPEAT_RUNNING);
+ command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING +
"\":\"222\"}");
+ Assert.assertTrue(processService.verifyIsNeedCreateCommand(command1));
+
+ Command command2 = new Command();
+ command2.setCommandType(CommandType.PAUSE);
+ Assert.assertTrue(processService.verifyIsNeedCreateCommand(command2));
+ }
+
+ @Test
+ public void testCreateRecoveryWaitingThreadCommand() {
+
+ int id = 123;
+ Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
+ ProcessInstance subProcessInstance = new ProcessInstance();
+ subProcessInstance.setIsSubProcess(Flag.YES);
+ Command originCommand = new Command();
+ originCommand.setId(id);
+ processService.createRecoveryWaitingThreadCommand(originCommand,
subProcessInstance);
+
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(111);
+ processService.createRecoveryWaitingThreadCommand(null,
subProcessInstance);
+
+ Command recoverCommand = new Command();
+ recoverCommand.setCommandType(CommandType.RECOVER_WAITTING_THREAD);
+ processService.createRecoveryWaitingThreadCommand(recoverCommand,
subProcessInstance);
+
+ Command repeatRunningCommand = new Command();
+ recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
+
processService.createRecoveryWaitingThreadCommand(repeatRunningCommand,
subProcessInstance);
+
+ ProcessInstance subProcessInstance2 = new ProcessInstance();
+ subProcessInstance2.setId(111);
+ subProcessInstance2.setIsSubProcess(Flag.NO);
+
processService.createRecoveryWaitingThreadCommand(repeatRunningCommand,
subProcessInstance2);
+
+ }
+
+ @Test
+ public void testHandleCommand() {
+
+ //cannot construct process instance, return null;
+ String host = "127.0.0.1";
+ int validThreadNum = 1;
+ Command command = new Command();
+ command.setProcessDefinitionId(222);
+ command.setCommandType(CommandType.REPEAT_RUNNING);
+ command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING +
"\":\"111\",\""
+ + CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}");
+
Mockito.when(processDefineMapper.selectById(command.getProcessDefinitionId())).thenReturn(null);
+ Assert.assertNull(processService.handleCommand(logger, host,
validThreadNum, command));
+
+ //there is not enough thread for this command
+ Command command1 = new Command();
+ command1.setProcessDefinitionId(123);
+ command1.setCommandParam("{\"ProcessInstanceId\":222}");
+ command1.setCommandType(CommandType.START_PROCESS);
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(123);
+ processDefinition.setName("test");
+ processDefinition.setVersion(1);
+
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ +
"{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ +
",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo
\\\"123123\\\"\",\"resourceList\":[]}"
+ +
",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ +
",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ +
",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(222);
+
Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition);
+
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
+ Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command1));
+
+ Command command2 = new Command();
+
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
+ command2.setProcessDefinitionId(123);
+ command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
+
+ Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command2));
+
+ Command command3 = new Command();
+ command3.setProcessDefinitionId(123);
+ command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
+ command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command3));
+
+ Command command4 = new Command();
+ command4.setProcessDefinitionId(123);
+
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
+ command4.setCommandType(CommandType.REPEAT_RUNNING);
+ Assert.assertNotNull(processService.handleCommand(logger, host,
validThreadNum, command4));
+ }
+
+ @Test
+ public void testGetUserById() {
+ User user = new User();
+ user.setId(123);
+ Mockito.when(userMapper.selectById(123)).thenReturn(user);
+ Assert.assertEquals(user, processService.getUserById(123));
+ }
+
+ @Test
+ public void testFormatTaskAppId() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(333);
+ taskInstance.setProcessDefinitionId(111);
+ taskInstance.setProcessInstanceId(222);
+
Mockito.when(processService.findProcessDefineById(taskInstance.getProcessDefinitionId())).thenReturn(null);
+
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(null);
+ Assert.assertEquals("", processService.formatTaskAppId(taskInstance));
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(111);
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(222);
+
Mockito.when(processService.findProcessDefineById(taskInstance.getProcessDefinitionId())).thenReturn(processDefinition);
+
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
+ Assert.assertEquals("111_222_333",
processService.formatTaskAppId(taskInstance));
+
+ }
+
+ @Test
+ public void testRecurseFindSubProcessId() {
+ ProcessDefinition processDefinition = new ProcessDefinition();
+
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ +
"{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\""
+ +
",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\""
+ +
",\"maxRetryTimes\":\"0\",\"name\":\"test\",\"params\":{\"localParams\":[],"
+ + "\"rawScript\":\"echo
\\\"123123\\\"\",\"resourceList\":[],\"processDefinitionId\""
+ +
":\"222\"},\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\","
+ +
"\"taskInstancePriority\":\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":"
+ +
"null,\"strategy\":\"\"},\"type\":\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],"
+ + "\"tenantId\":4,\"timeout\":0}");
+ int parentId = 111;
+ List<Integer> ids = new ArrayList<>();
+ ProcessDefinition processDefinition2 = new ProcessDefinition();
+
processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\""
+ +
":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ +
"\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\","
+ + "\"params\":{\"localParams\":[],\"rawScript\":\"echo
\\\"123123\\\"\",\"resourceList\":[]},"
+ +
"\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":"
+ +
"\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":"
+ +
"\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+
Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
+
Mockito.when(processDefineMapper.selectById(222)).thenReturn(processDefinition2);
+ processService.recurseFindSubProcessId(parentId, ids);
+
+ }
}