This is an automated email from the ASF dual-hosted git repository. zhoujieguang pushed a commit to branch feat/task-plugin-dynamic in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 784155ce845ba800e14f57836adaeebedfa7a3c0 Author: JieguangZhou <[email protected]> AuthorDate: Tue May 23 18:24:51 2023 +0800 add dynamic task plugin --- .../api/controller/ProcessInstanceController.java | 23 ++ .../api/dto/DynamicSubWorkflowDto.java | 24 ++ .../apache/dolphinscheduler/api/enums/Status.java | 2 + .../api/service/ProcessInstanceService.java | 4 + .../service/impl/ProcessInstanceServiceImpl.java | 68 +++++ .../src/main/resources/task-type-config.yaml | 1 + .../common/constants/CommandKeyConstants.java | 2 + .../dolphinscheduler/common/enums/CommandType.java | 1 + .../common/enums/WorkflowExecutionStatus.java | 1 + .../utils/placeholder/BusinessTimeUtils.java | 1 + .../dao/entity/RelationSubWorkflow.java | 50 ++++ .../dolphinscheduler/dao/entity/TaskInstance.java | 5 + .../dao/mapper/RelationSubWorkflowMapper.java | 40 +++ .../dao/repository/ProcessInstanceDao.java | 2 + .../repository/impl/ProcessInstanceDaoImpl.java | 9 + .../dao/mapper/RelationSubWorkflowMapper.xml | 44 ++++ .../main/resources/sql/dolphinscheduler_mysql.sql | 13 + .../master/event/TaskTimeoutStateEventHandler.java | 3 +- .../master/runner/WorkflowExecuteRunnable.java | 24 +- .../MasterTaskExecuteRunnableFactoryBuilder.java | 4 +- .../dynamic/DynamicAsyncTaskExecuteFunction.java | 178 +++++++++++++ .../runner/task/dynamic/DynamicCommandUtils.java | 68 +++++ .../runner/task/dynamic/DynamicLogicTask.java | 290 +++++++++++++++++++++ .../dynamic/DynamicLogicTaskPluginFactory.java | 72 +++++ .../master/runner/task/dynamic/DynamicOutput.java | 16 ++ .../task/subworkflow/SubWorkflowLogicTask.java | 4 +- .../server/master/utils/TaskUtils.java | 4 +- .../service/process/ProcessService.java | 3 + .../service/process/ProcessServiceImpl.java | 6 +- .../service/subworkflow/SubWorkflowService.java | 29 +++ .../subworkflow/SubWorkflowServiceImpl.java | 102 ++++++++ .../plugin/task/api/TaskConstants.java | 2 + .../plugin/task/api/TaskPluginManager.java | 3 + .../task/api/model/DynamicInputParameter.java | 16 ++ .../task/api/parameters/DynamicParameters.java | 53 ++++ .../public/images/task-icons/dynamic.png | Bin 0 -> 10606 bytes .../public/images/task-icons/dynamic_hover.png | Bin 0 -> 11229 bytes dolphinscheduler-ui/src/common/common.ts | 17 +- dolphinscheduler-ui/src/common/types.ts | 1 + dolphinscheduler-ui/src/locales/en_US/project.ts | 12 +- dolphinscheduler-ui/src/locales/zh_CN/project.ts | 12 +- dolphinscheduler-ui/src/store/project/task-type.ts | 3 + dolphinscheduler-ui/src/store/project/types.ts | 1 + .../projects/task/components/node/detail-modal.tsx | 2 +- .../projects/task/components/node/fields/index.ts | 1 + .../task/components/node/fields/use-dynamic.ts | 120 +++++++++ .../projects/task/components/node/format-data.ts | 10 +- .../projects/task/components/node/tasks/index.ts | 3 + .../task/components/node/tasks/use-dynamic.ts | 96 +++++++ .../views/projects/task/components/node/types.ts | 4 + .../src/views/projects/task/constants/task-type.ts | 4 + .../workflow/components/dag/dag.module.scss | 6 + 52 files changed, 1444 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index b64f51a70b..947cb05951 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; @@ -319,6 +320,28 @@ public class ProcessInstanceController extends BaseController { return returnDataList(result); } + /** + * query dynamic sub process instance detail info by task id + * + * @param loginUser login user + * @param taskId task id + * @return sub process instance detail + */ + @Operation(summary = "queryDynamicSubWorkflowInstances", description = "QUERY_DYNAMIC_SUBPROCESS_INSTANCE_BY_TASK_CODE_NOTES") + @Parameters({ + @Parameter(name = "taskId", description = "taskInstanceId", required = true, schema = @Schema(implementation = int.class, example = "100")) + }) + @GetMapping(value = "/query-dynamic-sub-workflows") + @ResponseStatus(HttpStatus.OK) + @ApiException(Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result<List<DynamicSubWorkflowDto>> queryDynamicSubWorkflowInstances(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("taskId") Integer taskId) { + List<DynamicSubWorkflowDto> dynamicSubWorkflowDtos = + processInstanceService.queryDynamicSubWorkflowInstances(loginUser, taskId); + return new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg(), dynamicSubWorkflowDtos); + } + /** * query process instance global variables and local variables * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java new file mode 100644 index 0000000000..b688aa4066 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java @@ -0,0 +1,24 @@ +package org.apache.dolphinscheduler.api.dto; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; + +import java.util.Map; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class DynamicSubWorkflowDto { + + private long processInstanceId; + + private String name; + + private long index; + + private Map<String, String> parameters; + + private WorkflowExecutionStatus state; + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index d29235439d..4ad45709cf 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -271,6 +271,8 @@ public enum Status { NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"), STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair", "状态码前后不一致或状态码和code不匹配"), + TASK_INSTANCE_NOT_DYNAMIC_TASK(10213, "task instance {0} is not dynamic", "任务实例[{0}]不是Dynamic类型"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 304d0139e2..ccb3f98b4d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto; import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; @@ -135,6 +136,9 @@ public interface ProcessInstanceService { long projectCode, Integer taskId); + List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, + Integer taskId); + /** * update process instance * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 26606caa3b..da165db9a0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; +import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; @@ -44,6 +45,7 @@ import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.UsersService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; @@ -57,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; @@ -66,6 +69,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; @@ -93,6 +97,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -184,6 +189,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired private ScheduleMapper scheduleMapper; + @Autowired + private RelationSubWorkflowMapper relationSubWorkflowMapper; + @Autowired private AlertDao alertDao; @@ -488,6 +496,66 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } + @Override + public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) { + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId); + Map<String, Object> result = new HashMap<>(); + if (taskInstance == null) { + putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); + throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId); + } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskInstance.getTaskCode()); + if (taskDefinition == null) { + putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); + throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId); + } + + if (!taskInstance.isDynamic()) { + putMsg(result, Status.TASK_INSTANCE_NOT_DYNAMIC_TASK, taskInstance.getName()); + throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId); + } + List<RelationSubWorkflow> relationSubWorkflows = relationSubWorkflowMapper + .selectAllSubProcessInstance(Long.valueOf(taskInstance.getProcessInstanceId()), + taskInstance.getTaskCode()); + List<Long> allSubProcessInstanceId = relationSubWorkflows.stream() + .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList()); + List<ProcessInstance> allSubWorkflows = processInstanceDao.selectBatchIds(allSubProcessInstanceId); + + if (allSubWorkflows == null || allSubWorkflows.isEmpty()) { + putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId); + throw new ServiceException(Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId); + } + Long subWorkflowCode = allSubWorkflows.get(0).getProcessDefinitionCode(); + int subWorkflowVersion = allSubWorkflows.get(0).getProcessDefinitionVersion(); + ProcessDefinition subProcessDefinition = + processService.findProcessDefinition(subWorkflowCode, subWorkflowVersion); + if (subProcessDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, subWorkflowCode); + throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, subWorkflowCode); + } + + allSubWorkflows.sort(Comparator.comparing(ProcessInstance::getId)); + + List<DynamicSubWorkflowDto> allDynamicSubWorkflowDtos = new ArrayList<>(); + int index = 1; + for (ProcessInstance processInstance : allSubWorkflows) { + DynamicSubWorkflowDto dynamicSubWorkflowDto = new DynamicSubWorkflowDto(); + dynamicSubWorkflowDto.setProcessInstanceId(processInstance.getId()); + dynamicSubWorkflowDto.setIndex(index); + dynamicSubWorkflowDto.setState(processInstance.getState()); + dynamicSubWorkflowDto.setName(subProcessDefinition.getName()); + Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam()); + String parameter = commandParamMap.get(CommandKeyConstants.CMD_DYNAMIC_START_PARAMS); + dynamicSubWorkflowDto.setParameters(JSONUtils.toMap(parameter)); + allDynamicSubWorkflowDtos.add(dynamicSubWorkflowDto); + index++; + + } + + return allDynamicSubWorkflowDtos; + } + /** * add dependent result for dependent task */ diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-api/src/main/resources/task-type-config.yaml index 7a21c36946..9105d50697 100644 --- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml +++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml @@ -40,6 +40,7 @@ task: - 'DEPENDENT' - 'CONDITIONS' - 'SWITCH' + - 'DYNAMIC' dataIntegration: - 'SEATUNNEL' - 'DATAX' diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java index c1f801257a..68dfdd3a6f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java @@ -45,6 +45,8 @@ public class CommandKeyConstants { public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams"; + public static final String CMD_DYNAMIC_START_PARAMS = "dynamicParams"; + /** * complement data start date */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index 7aa2b881b3..27a2dc7ac3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -56,6 +56,7 @@ public enum CommandType { RECOVER_WAITING_THREAD(10, "recover waiting thread"), RECOVER_SERIAL_WAIT(11, "recover serial wait"), EXECUTE_TASK(12, "start a task node in a process instance"), + DYNAMIC_GENERATION(13, "dynamic generation"), ; CommandType(int code, String descp) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java index 4fced36110..e8ec2dbc08 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java @@ -40,6 +40,7 @@ public enum WorkflowExecutionStatus { SERIAL_WAIT(14, "serial wait"), READY_BLOCK(15, "ready block"), BLOCK(16, "block"), + WAIT_TO_RUN(17, "wait to run"), ; private static final Map<Integer, WorkflowExecutionStatus> CODE_MAP = new HashMap<>(); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java index 9d30aa71ca..6d6e5497d4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java @@ -59,6 +59,7 @@ public class BusinessTimeUtils { case RECOVER_SUSPENDED_PROCESS: case START_FAILURE_TASK_PROCESS: case REPEAT_RUNNING: + case DYNAMIC_GENERATION: case SCHEDULER: default: businessDate = addDays(new Date(), -1); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java new file mode 100644 index 0000000000..bc19a9d130 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import lombok.Data; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +@Data +@TableName("t_ds_relation_sub_workflow") +public class RelationSubWorkflow { + + /** + * id + */ + @TableId(value = "id", type = IdType.AUTO) + private Integer id; + + /** + * parent process instance id + */ + private Long parentWorkflowInstanceId; + + /** + * parent task instance id + */ + private Long parentTaskCode; + + /** + * process instance id + */ + private Long subWorkflowInstanceId; +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index cf5eb62ec7..0a0b4876e4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SEC_2_MINUT import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DYNAMIC; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; @@ -357,6 +358,10 @@ public class TaskInstance implements Serializable { return TASK_TYPE_DEPENDENT.equalsIgnoreCase(this.taskType); } + public boolean isDynamic() { + return TASK_TYPE_DYNAMIC.equalsIgnoreCase(this.taskType); + } + public boolean isConditionsTask() { return TASK_TYPE_CONDITIONS.equalsIgnoreCase(this.taskType); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java new file mode 100644 index 0000000000..8a3f0d350d --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * process instance map mapper interface + */ +public interface RelationSubWorkflowMapper extends BaseMapper<RelationSubWorkflow> { + + int batchInsert(@Param("relationSubWorkflows") List<RelationSubWorkflow> relationSubWorkflows); + + List<RelationSubWorkflow> selectAllSubProcessInstance(@Param("parentWorkflowInstanceId") Long parentWorkflowInstanceId, + @Param("parentTaskCode") Long parentTaskCode); + + RelationSubWorkflow queryParentWorkflowInstance(@Param("subWorkflowInstanceId") Long subWorkflowInstanceId); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index 0144356dfe..720c46d898 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -38,6 +38,8 @@ public interface ProcessInstanceDao { */ public int upsertProcessInstance(ProcessInstance processInstance); + List<ProcessInstance> selectBatchIds(List<Long> processInstanceIds); + void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds); void deleteById(Integer workflowInstanceId); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index 5117145fd4..8a8402f686 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.commons.collections4.CollectionUtils; +import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -65,6 +66,14 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { } } + @Override + public List<ProcessInstance> selectBatchIds(List<Long> processInstanceIds) { + if (CollectionUtils.isEmpty(processInstanceIds)) { + return new ArrayList<>(); + } + return processInstanceMapper.selectBatchIds(processInstanceIds); + } + @Override public void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds) { if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) { diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml new file mode 100644 index 0000000000..4f3687b212 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > +<mapper namespace="org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper"> + <sql id="baseSql"> + id, parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id + </sql> + <insert id="batchInsert"> + insert into t_ds_relation_sub_workflow (parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id) + values + <foreach collection="relationSubWorkflows" item="relationSubWorkflow" separator=","> + (#{relationSubWorkflow.parentWorkflowInstanceId}, #{relationSubWorkflow.parentTaskCode}, #{relationSubWorkflow.subWorkflowInstanceId}) + </foreach> + </insert> + <select id="selectAllSubProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow"> + select + id, parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id + FROM t_ds_relation_sub_workflow + WHERE parent_workflow_instance_id = #{parentWorkflowInstanceId} + AND parent_task_code = #{parentTaskCode} + </select> + <select id="queryParentWorkflowInstance" resultType="org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow"> + select + id, parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id + FROM t_ds_relation_sub_workflow + WHERE sub_workflow_instance_id = #{subWorkflowInstanceId} + </select> +</mapper> diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index d45305336b..70ec0ab255 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -2016,3 +2016,16 @@ CREATE TABLE `t_ds_trigger_relation` ( KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`), UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin; + + +DROP TABLE IF EXISTS `t_ds_relation_sub_workflow`; +CREATE TABLE `t_ds_relation_sub_workflow` ( + `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'key', + `parent_workflow_instance_id` bigint NOT NULL COMMENT 'parent process instance id', + `parent_task_code` bigint NOT NULL COMMENT 'parent process instance id', + `sub_workflow_instance_id` bigint NOT NULL COMMENT 'child process instance id', + PRIMARY KEY (`id`), + KEY `idx_parent_workflow_instance_id` (`parent_workflow_instance_id`), + KEY `idx_parent_task_code` (`parent_task_code`), + KEY `idx_sub_workflow_instance_id` (`sub_workflow_instance_id`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE utf8_bin; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java index e01955d81a..16476ad7f7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java @@ -37,7 +37,7 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler { @Override public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, - StateEvent stateEvent) throws StateEventHandleError { + StateEvent stateEvent) throws StateEventHandleError, StateEventHandleException { TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent; TaskMetrics.incTaskInstanceByState("timeout"); @@ -62,6 +62,7 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler { || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) { if (taskExecuteRunnableMap.containsKey(taskInstance.getTaskCode())) { taskExecuteRunnableMap.get(taskInstance.getTaskCode()).timeout(); + workflowExecuteRunnable.taskFinished(taskInstance); } else { log.warn( "cannot find the task processor for task {}, so skip task processor action.", diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 7f77a5c99f..e8cdeb9069 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -446,8 +446,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> { if (taskInstance.getState().isSuccess()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); - // todo: merge the last taskInstance - processInstance.setVarPool(taskInstance.getVarPool()); + mergeTaskInstanceVarPool(taskInstance); processInstanceDao.upsertProcessInstance(processInstance); // save the cacheKey only if the task is defined as cache task and the task is success if (taskInstance.getIsCache().equals(Flag.YES)) { @@ -2203,4 +2202,25 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> { private boolean isExecutedOnMaster(String host) { return host.endsWith(masterAddress.split(Constants.COLON)[1]); } + + private void mergeTaskInstanceVarPool(TaskInstance taskInstance) { + String taskVarPoolJson = taskInstance.getVarPool(); + if (StringUtils.isEmpty(taskVarPoolJson)) { + return; + } + String processVarPoolJson = processInstance.getVarPool(); + if (StringUtils.isEmpty(processVarPoolJson)) { + processInstance.setVarPool(taskVarPoolJson); + return; + } + List<Property> processVarPool = new ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class)); + List<Property> taskVarPool = JSONUtils.toList(taskVarPoolJson, Property.class); + Set<String> newProcessVarPoolKeys = taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet()); + processVarPool = processVarPool.stream().filter(property -> !newProcessVarPoolKeys.contains(property.getProp())) + .collect(Collectors.toList()); + + processVarPool.addAll(taskVarPool); + + processInstance.setVarPool(JSONUtils.toJsonString(processVarPool)); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java index f196d85153..c5689f6a1b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask; +import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; import java.util.Set; @@ -38,7 +39,8 @@ public class MasterTaskExecuteRunnableFactoryBuilder { private static final Set<String> ASYNC_TASK_TYPE = Sets.newHashSet( DependentLogicTask.TASK_TYPE, - SubWorkflowLogicTask.TASK_TYPE); + SubWorkflowLogicTask.TASK_TYPE, + DynamicLogicTask.TASK_TYPE); public MasterDelayTaskExecuteRunnableFactory<? extends MasterDelayTaskExecuteRunnable> createWorkerDelayTaskExecuteRunnableFactory(String taskType) { if (ASYNC_TASK_TYPE.contains(taskType)) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java new file mode 100644 index 0000000000..b433b6f3bc --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.dynamic; + +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_DYNAMIC_START_PARAMS; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction; +import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DynamicAsyncTaskExecuteFunction implements AsyncTaskExecuteFunction { + + private static final Duration TASK_EXECUTE_STATE_CHECK_INTERVAL = Duration.ofSeconds(10); + + private static final String OUTPUT_KEY = "dynamic.out"; + + private final ProcessInstance processInstance; + + private final TaskInstance taskInstance; + + private final SubWorkflowService subWorkflowService; + + private final CommandMapper commandMapper; + + private final int degreeOfParallelism; + + private final DynamicLogicTask logicTask; + + public DynamicAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext, + ProcessInstance processInstance, + TaskInstance taskInstance, + DynamicLogicTask dynamicLogicTask, + CommandMapper commandMapper, + SubWorkflowService subWorkflowService, + int degreeOfParallelism) { + this.processInstance = processInstance; + this.taskInstance = taskInstance; + this.logicTask = dynamicLogicTask; + this.degreeOfParallelism = degreeOfParallelism; + + this.commandMapper = commandMapper; + this.subWorkflowService = subWorkflowService; + } + + @Override + public @NonNull AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() { + List<ProcessInstance> allSubProcessInstance = getAllSubProcessInstance(); + int totalSubProcessInstanceCount = allSubProcessInstance.size(); + + List<ProcessInstance> finishedSubProcessInstance = + subWorkflowService.filterFinishProcessInstances(allSubProcessInstance); + + if (finishedSubProcessInstance.size() == totalSubProcessInstanceCount) { + log.info("all sub process instance finish"); + int successCount = subWorkflowService.filterSuccessProcessInstances(finishedSubProcessInstance).size(); + log.info("success sub process instance count: {}", successCount); + if (successCount == totalSubProcessInstanceCount) { + log.info("all sub process instance success"); + setOutputParameters(); + return AsyncTaskExecutionStatus.SUCCESS; + } else { + int failedCount = totalSubProcessInstanceCount - successCount; + log.info("failed sub process instance count: {}", failedCount); + return AsyncTaskExecutionStatus.FAILED; + } + } + + if (logicTask.isCancel()) { + return AsyncTaskExecutionStatus.FAILED; + } + + int runningCount = subWorkflowService.filterRunningProcessInstances(allSubProcessInstance).size(); + int startCount = degreeOfParallelism - runningCount; + if (startCount > 0) { + log.info("There are {} sub process instances that can be started", startCount); + startSubProcessInstances(allSubProcessInstance, startCount); + } + // query the status of sub workflow instance + return AsyncTaskExecutionStatus.RUNNING; + } + + private void setOutputParameters() { + log.info("set varPool"); + List<ProcessInstance> allSubProcessInstance = getAllSubProcessInstance(); + + List<DynamicOutput> dynamicOutputs = new ArrayList<>(); + int index = 1; + for (ProcessInstance processInstance : allSubProcessInstance) { + DynamicOutput dynamicOutput = new DynamicOutput(); + Map<String, String> dynamicParams = + JSONUtils.toMap(JSONUtils.toMap(processInstance.getCommandParam()).get(CMD_DYNAMIC_START_PARAMS)); + dynamicOutput.setDynParams(dynamicParams); + + Map<String, String> outputValueMap = new HashMap<>(); + List<Property> propertyList = subWorkflowService.getWorkflowOutputParameters(processInstance); + for (Property property : propertyList) { + outputValueMap.put(property.getProp(), property.getValue()); + } + + dynamicOutput.setOutputValue(outputValueMap); + dynamicOutput.setMappedTimes(index++); + dynamicOutputs.add(dynamicOutput); + } + + Property property = new Property(); + property.setProp(String.format("%s(%s)", OUTPUT_KEY, taskInstance.getName())); + property.setDirect(Direct.OUT); + property.setType(DataType.VARCHAR); + property.setValue(JSONUtils.toJsonString(dynamicOutputs)); + + List<Property> taskPropertyList = new ArrayList<>(JSONUtils.toList(taskInstance.getVarPool(), Property.class)); + taskPropertyList.add(property); + logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList)); + + log.info("set property: {}", property); + } + + private void startSubProcessInstances(List<ProcessInstance> allSubProcessInstance, int startCount) { + List<ProcessInstance> waitingProcessInstances = + subWorkflowService.filterWaitToRunProcessInstances(allSubProcessInstance); + + for (int i = 0; i < Math.min(startCount, waitingProcessInstances.size()); i++) { + ProcessInstance subProcessInstance = waitingProcessInstances.get(i); + Map<String, String> parameters = JSONUtils.toMap(DynamicCommandUtils + .getDataFromCommandParam(subProcessInstance.getCommandParam(), CMD_DYNAMIC_START_PARAMS)); + Command command = DynamicCommandUtils.createCommand(this.processInstance, + subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion(), + parameters); + command.setProcessInstanceId(subProcessInstance.getId()); + commandMapper.insert(command); + log.info("start sub process instance, sub process instance id: {}, command: {}", subProcessInstance.getId(), + command); + } + } + + public List<ProcessInstance> getAllSubProcessInstance() { + return subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode()); + } + + @Override + public @NonNull Duration getAsyncTaskStateCheckInterval() { + return TASK_EXECUTE_STATE_CHECK_INTERVAL; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java new file mode 100644 index 0000000000..2acc9752eb --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java @@ -0,0 +1,68 @@ +package org.apache.dolphinscheduler.server.master.runner.task.dynamic; + +import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; + +public class DynamicCommandUtils { + + static public Command createCommand(ProcessInstance processInstance, + Long subProcessDefinitionCode, + Integer subProcessDefinitionVersion, + Map<String, String> parameters) { + Command command = new Command(); + if (processInstance.getCommandType().equals(CommandType.START_PROCESS)) { + command.setCommandType(CommandType.DYNAMIC_GENERATION); + } else { + command.setCommandType(processInstance.getCommandType()); + } + command.setProcessDefinitionCode(subProcessDefinitionCode); + command.setProcessDefinitionVersion(subProcessDefinitionVersion); + command.setTaskDependType(TaskDependType.TASK_POST); + command.setFailureStrategy(processInstance.getFailureStrategy()); + command.setWarningType(processInstance.getWarningType()); + + String globalParams = processInstance.getGlobalParams(); + if (StringUtils.isNotEmpty(globalParams)) { + List<Property> parentParams = Lists.newArrayList(JSONUtils.toList(globalParams, Property.class)); + for (Property parentParam : parentParams) { + parameters.put(parentParam.getProp(), parentParam.getValue()); + } + } + + addDataToCommandParam(command, CommandKeyConstants.CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(parameters)); + command.setExecutorId(processInstance.getExecutorId()); + command.setWarningGroupId(processInstance.getWarningGroupId()); + command.setProcessInstancePriority(processInstance.getProcessInstancePriority()); + command.setWorkerGroup(processInstance.getWorkerGroup()); + command.setDryRun(processInstance.getDryRun()); + return command; + } + + static public String getDataFromCommandParam(String commandParam, String key) { + Map<String, String> cmdParam = JSONUtils.toMap(commandParam); + return cmdParam.get(key); + } + + static void addDataToCommandParam(Command command, String key, String data) { + Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); + if (cmdParam == null) { + cmdParam = new HashMap<>(); + } + cmdParam.put(key, data); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java new file mode 100644 index 0000000000..e0b93994ad --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.dynamic; + +import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter; +import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters; +import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; +import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient; +import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction; +import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; + +@Slf4j +public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> { + + public static final String TASK_TYPE = "DYNAMIC"; + private final ProcessInstanceDao processInstanceDao; + + private final SubWorkflowService subWorkflowService; + + private final ProcessDefinitionMapper processDefineMapper; + + private final CommandMapper commandMapper; + + private final ProcessService processService; + + private ProcessInstance processInstance; + + private TaskInstance taskInstance; + + private final MasterRpcClient masterRpcClient; + + private boolean haveBeenCanceled = false; + + public DynamicLogicTask(TaskExecutionContext taskExecutionContext, + ProcessInstanceDao processInstanceDao, + TaskInstanceDao taskInstanceDao, + SubWorkflowService subWorkflowService, + ProcessService processService, + MasterRpcClient masterRpcClient, + ProcessDefinitionMapper processDefineMapper, + CommandMapper commandMapper) { + super(taskExecutionContext, + JSONUtils.parseObject(taskExecutionContext.getTaskParams(), new TypeReference<DynamicParameters>() { + })); + this.processInstanceDao = processInstanceDao; + this.subWorkflowService = subWorkflowService; + this.processService = processService; + this.masterRpcClient = masterRpcClient; + this.processDefineMapper = processDefineMapper; + this.commandMapper = commandMapper; + + this.processInstance = + processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId()); + this.taskInstance = taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); + } + + @Override + public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws MasterTaskExecuteException { + List<Map<String, String>> parameterGroup = generateParameterGroup(); + + if (parameterGroup.size() > taskParameters.getMaxNumOfSubWorkflowInstances()) { + log.warn("the number of sub process instances [{}] exceeds the maximum limit [{}]", parameterGroup.size(), + taskParameters.getMaxNumOfSubWorkflowInstances()); + parameterGroup = parameterGroup.subList(0, taskParameters.getMaxNumOfSubWorkflowInstances()); + } + + // if already exists sub process instance, do not generate again + List<ProcessInstance> existsSubProcessInstanceList = + subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode()); + if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) { + generateSubWorkflowInstance(parameterGroup); + } else { + resetProcessInstanceStatus(existsSubProcessInstanceList); + } + return new DynamicAsyncTaskExecuteFunction(taskExecutionContext, processInstance, taskInstance, this, + commandMapper, + subWorkflowService, taskParameters.getDegreeOfParallelism()); + } + + public void resetProcessInstanceStatus(List<ProcessInstance> existsSubProcessInstanceList) { + switch (processInstance.getCommandType()) { + case REPEAT_RUNNING: + existsSubProcessInstanceList.forEach(processInstance -> { + processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); + processInstanceDao.updateProcessInstance(processInstance); + }); + break; + case START_FAILURE_TASK_PROCESS: + case RECOVER_TOLERANCE_FAULT_PROCESS: + List<ProcessInstance> failedProcessInstances = + subWorkflowService.filterFailedProcessInstances(existsSubProcessInstanceList); + failedProcessInstances.forEach(processInstance -> { + processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); + processInstanceDao.updateProcessInstance(processInstance); + }); + break; + } + } + + public void generateSubWorkflowInstance(List<Map<String, String>> parameterGroup) throws MasterTaskExecuteException { + List<ProcessInstance> processInstanceList = new ArrayList<>(); + ProcessDefinition subProcessDefinition = + processDefineMapper.queryByCode(taskParameters.getProcessDefinitionCode()); + for (Map<String, String> parameters : parameterGroup) { + String dynamicStartParams = JSONUtils.toJsonString(parameters); + Command command = DynamicCommandUtils.createCommand(processInstance, subProcessDefinition.getCode(), + subProcessDefinition.getVersion(), parameters); + // todo: set id to -1? we use command to generate sub process instance, but the generate method will use the + // command id to do + // somethings + command.setId(-1); + DynamicCommandUtils.addDataToCommandParam(command, CommandKeyConstants.CMD_DYNAMIC_START_PARAMS, + dynamicStartParams); + ProcessInstance subProcessInstance = createSubProcessInstance(command); + subProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN); + processInstanceDao.insertProcessInstance(subProcessInstance); + command.setProcessInstanceId(subProcessInstance.getId()); + processInstanceList.add(subProcessInstance); + } + + List<RelationSubWorkflow> relationSubWorkflowList = new ArrayList<>(); + for (ProcessInstance subProcessInstance : processInstanceList) { + RelationSubWorkflow relationSubWorkflow = new RelationSubWorkflow(); + relationSubWorkflow.setParentWorkflowInstanceId(Long.valueOf(processInstance.getId())); + relationSubWorkflow.setParentTaskCode(taskInstance.getTaskCode()); + relationSubWorkflow.setSubWorkflowInstanceId(Long.valueOf(subProcessInstance.getId())); + relationSubWorkflowList.add(relationSubWorkflow); + } + + log.info("Expected number of runs : {}, actual number of runs : {}", parameterGroup.size(), + processInstanceList.size()); + + int insertN = subWorkflowService.batchInsertRelationSubWorkflow(relationSubWorkflowList); + log.info("insert {} relation sub workflow", insertN); + } + + public ProcessInstance createSubProcessInstance(Command command) throws MasterTaskExecuteException { + ProcessInstance subProcessInstance; + try { + subProcessInstance = processService.constructProcessInstance(command, processInstance.getHost()); + } catch (Exception e) { + log.error("create sub process instance error", e); + throw new MasterTaskExecuteException(e.getMessage()); + } + return subProcessInstance; + } + + public List<Map<String, String>> generateParameterGroup() { + List<DynamicInputParameter> dynamicInputParameters = taskParameters.getListParameters(); + Set<String> filterStrings = + Arrays.stream(StringUtils.split(taskParameters.getFilterCondition(), ",")).map(String::trim) + .collect(Collectors.toSet()); + + List<List<DynamicInputParameter>> allParameters = new ArrayList<>(); + for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) { + List<DynamicInputParameter> singleParameters = new ArrayList<>(); + String value = dynamicInputParameter.getValue(); + String separator = dynamicInputParameter.getSeparator(); + List<String> valueList = + Arrays.stream(StringUtils.split(value, separator)).map(String::trim).collect(Collectors.toList()); + + valueList = valueList.stream().filter(v -> !filterStrings.contains(v)).collect(Collectors.toList()); + + for (String v : valueList) { + DynamicInputParameter singleParameter = new DynamicInputParameter(); + singleParameter.setName(dynamicInputParameter.getName()); + singleParameter.setValue(v); + singleParameters.add(singleParameter); + } + allParameters.add(singleParameters); + } + + // use Sets.cartesianProduct to get the cartesian product of all parameters + List<List<DynamicInputParameter>> cartesianProduct = Lists.cartesianProduct(allParameters); + + // convert cartesian product to parameter group List<Map<name:value>> + List<Map<String, String>> parameterGroup = cartesianProduct.stream().map( + inputParameterList -> inputParameterList.stream().collect( + Collectors.toMap(DynamicInputParameter::getName, DynamicInputParameter::getValue))) + .collect(Collectors.toList()); + + log.info("parameter group size: {}", parameterGroup.size()); + // log every parameter group + if (CollectionUtils.isNotEmpty(parameterGroup)) { + for (Map<String, String> map : parameterGroup) { + log.info("parameter group: {}", map); + } + } + return parameterGroup; + } + + @Override + public void kill() { + try { + changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP); + } catch (MasterTaskExecuteException e) { + log.error("kill " + taskInstance.getName() + " error", e); + } + } + + private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) throws MasterTaskExecuteException { + this.haveBeenCanceled = true; + List<ProcessInstance> existsSubProcessInstanceList = + subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode()); + List<ProcessInstance> runningSubProcessInstanceList = + subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList); + for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) { + subProcessInstance.setState(stopStatus); + processInstanceDao.updateProcessInstance(subProcessInstance); + if (subProcessInstance.getState().isFinished()) { + log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId()); + return; + } + try { + sendToSubProcess(taskExecutionContext, subProcessInstance); + log.info("Success send [{}] request to SubWorkflow's master: {}", stopStatus, + subProcessInstance.getHost()); + } catch (RemotingException e) { + throw new MasterTaskExecuteException( + String.format("Send stop request to SubWorkflow's master: %s failed", + subProcessInstance.getHost()), + e); + } + } + } + + private void sendToSubProcess(TaskExecutionContext taskExecutionContext, + ProcessInstance subProcessInstance) throws RemotingException { + WorkflowStateEventChangeRequest stateEventChangeCommand = new WorkflowStateEventChangeRequest( + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId(), + subProcessInstance.getState(), + subProcessInstance.getId(), + 0); + Host host = new Host(subProcessInstance.getHost()); + masterRpcClient.send(host, stateEventChangeCommand.convert2Command()); + } + + public boolean isCancel() { + return haveBeenCanceled; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java new file mode 100644 index 0000000000..cbcc50ada6 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.dynamic; + +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient; +import org.apache.dolphinscheduler.server.master.runner.task.ILogicTaskPluginFactory; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class DynamicLogicTaskPluginFactory implements ILogicTaskPluginFactory<DynamicLogicTask> { + + @Autowired + private ProcessInstanceDao processInstanceDao; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private ProcessDefinitionMapper processDefineMapper; + + @Autowired + private CommandMapper commandMapper; + + @Autowired + private ProcessService processService; + + @Autowired + SubWorkflowService subWorkflowService; + + @Autowired + private MasterRpcClient masterRpcClient; + + @Override + public DynamicLogicTask createLogicTask(TaskExecutionContext taskExecutionContext) { + return new DynamicLogicTask(taskExecutionContext, processInstanceDao, taskInstanceDao, subWorkflowService, + processService, + masterRpcClient, processDefineMapper, commandMapper); + + } + + @Override + public String getTaskType() { + return DynamicLogicTask.TASK_TYPE; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java new file mode 100644 index 0000000000..77e29cbac3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.server.master.runner.task.dynamic; + +import java.util.Map; + +import lombok.Data; + +@Data +public class DynamicOutput { + + private Map<String, String> dynParams; + + private Map<String, String> outputValue; + + private int mappedTimes; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java index 366fe53cec..7b3008b581 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java @@ -123,8 +123,8 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask<SubProcessParameter log.info("TaskInstance is null"); return; } - if (taskInstance.getState().isFinished()) { - log.info("The task instance is finished, no need to pause"); + if (subProcessInstance.getState().isFinished()) { + log.info("The subProcessInstance is finished, no need to pause"); return; } subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java index d717e69362..eb1fb27f37 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils; import org.apache.dolphinscheduler.server.master.runner.task.blocking.BlockingLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.condition.ConditionLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask; +import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask; @@ -38,7 +39,8 @@ public class TaskUtils { ConditionLogicTask.TASK_TYPE, DependentLogicTask.TASK_TYPE, SubWorkflowLogicTask.TASK_TYPE, - SwitchLogicTask.TASK_TYPE); + SwitchLogicTask.TASK_TYPE, + DynamicLogicTask.TASK_TYPE); public boolean isMasterTask(String taskType) { return MASTER_TASK_TYPES.contains(taskType); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index a7e01101a7..3d2757d76a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -64,6 +64,9 @@ public interface ProcessService { ProcessInstance handleCommand(String host, Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException; + ProcessInstance constructProcessInstance(Command command, + String host) throws CronParseException, CodeGenerateUtils.CodeGenerateException; + Optional<ProcessInstance> findProcessInstanceDetailById(int processId); ProcessInstance findProcessInstanceById(int processId); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index d0f56f329f..a2064ae2a8 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -744,8 +744,8 @@ public class ProcessServiceImpl implements ProcessService { * @param host host * @return process instance */ - protected @Nullable ProcessInstance constructProcessInstance(Command command, - String host) throws CronParseException, CodeGenerateException { + public @Nullable ProcessInstance constructProcessInstance(Command command, + String host) throws CronParseException, CodeGenerateException { ProcessInstance processInstance; ProcessDefinition processDefinition; CommandType commandType = command.getCommandType(); @@ -765,6 +765,7 @@ public class ProcessServiceImpl implements ProcessService { processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); } else { processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null); + setGlobalParamIfCommanded(processDefinition, cmdParam); if (processInstance == null) { return null; } @@ -816,6 +817,7 @@ public class ProcessServiceImpl implements ProcessService { int runTime = processInstance.getRunTimes(); switch (commandType) { case START_PROCESS: + case DYNAMIC_GENERATION: break; case START_FAILURE_TASK_PROCESS: // find failed tasks and init these tasks diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java new file mode 100644 index 0000000000..4c2bd9978d --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java @@ -0,0 +1,29 @@ +package org.apache.dolphinscheduler.service.subworkflow; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import java.util.List; + +import org.springframework.stereotype.Component; + +@Component +public interface SubWorkflowService { + + List<ProcessInstance> getAllDynamicSubWorkflow(long processInstanceId, long taskCode); + + int batchInsertRelationSubWorkflow(List<RelationSubWorkflow> relationSubWorkflowList); + + List<ProcessInstance> filterFinishProcessInstances(List<ProcessInstance> processInstanceList); + + List<ProcessInstance> filterSuccessProcessInstances(List<ProcessInstance> processInstanceList); + + List<ProcessInstance> filterRunningProcessInstances(List<ProcessInstance> processInstanceList); + + List<ProcessInstance> filterWaitToRunProcessInstances(List<ProcessInstance> processInstanceList); + + List<ProcessInstance> filterFailedProcessInstances(List<ProcessInstance> processInstanceList); + + List<Property> getWorkflowOutputParameters(ProcessInstance processInstance); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java new file mode 100644 index 0000000000..82b9342a5e --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java @@ -0,0 +1,102 @@ +package org.apache.dolphinscheduler.service.subworkflow; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class SubWorkflowServiceImpl implements SubWorkflowService { + + @Autowired + private RelationSubWorkflowMapper relationSubWorkflowMapper; + + @Autowired + private ProcessInstanceDao processInstanceDao; + + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + + @Override + public List<ProcessInstance> getAllDynamicSubWorkflow(long processInstanceId, long taskCode) { + List<RelationSubWorkflow> relationSubWorkflows = + relationSubWorkflowMapper.selectAllSubProcessInstance(processInstanceId, taskCode); + List<Long> allSubProcessInstanceId = relationSubWorkflows.stream() + .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList()); + + List<ProcessInstance> allSubProcessInstance = processInstanceDao.selectBatchIds(allSubProcessInstanceId); + allSubProcessInstance.sort(Comparator.comparing(ProcessInstance::getId)); + return allSubProcessInstance; + } + + @Override + public int batchInsertRelationSubWorkflow(List<RelationSubWorkflow> relationSubWorkflowList) { + int insertN = relationSubWorkflowMapper.batchInsert(relationSubWorkflowList); + return insertN; + } + + @Override + public List<ProcessInstance> filterFinishProcessInstances(List<ProcessInstance> processInstanceList) { + return processInstanceList.stream() + .filter(subProcessInstance -> subProcessInstance.getState().isFinished()).collect(Collectors.toList()); + } + + @Override + public List<ProcessInstance> filterSuccessProcessInstances(List<ProcessInstance> processInstanceList) { + return processInstanceList.stream() + .filter(subProcessInstance -> subProcessInstance.getState().isSuccess()).collect(Collectors.toList()); + } + + @Override + public List<ProcessInstance> filterRunningProcessInstances(List<ProcessInstance> processInstanceList) { + return processInstanceList.stream() + .filter(subProcessInstance -> subProcessInstance.getState().isRunning()).collect(Collectors.toList()); + } + + @Override + public List<ProcessInstance> filterWaitToRunProcessInstances(List<ProcessInstance> processInstanceList) { + return processInstanceList.stream() + .filter(subProcessInstance -> subProcessInstance.getState().equals(WorkflowExecutionStatus.WAIT_TO_RUN)) + .collect(Collectors.toList()); + } + + @Override + public List<ProcessInstance> filterFailedProcessInstances(List<ProcessInstance> processInstanceList) { + return processInstanceList.stream() + .filter(subProcessInstance -> subProcessInstance.getState().isFailure()).collect(Collectors.toList()); + } + + @Override + public List<Property> getWorkflowOutputParameters(ProcessInstance processInstance) { + List<Property> outputParamList = + new ArrayList<>(JSONUtils.toList(processInstance.getVarPool(), Property.class)); + + ProcessDefinitionLog processDefinition = processDefinitionLogMapper + .queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + List<Property> globalParamList = JSONUtils.toList(processDefinition.getGlobalParams(), Property.class); + + Set<String> ouputParamSet = outputParamList.stream().map(Property::getProp).collect(Collectors.toSet()); + + // add output global parameters which are not in output parameters list + globalParamList.stream().filter(globalParam -> !ouputParamSet.contains(globalParam.getProp())) + .forEach(outputParamList::add); + + return outputParamList; + + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index bb7c8fc439..44ced03e27 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -400,6 +400,8 @@ public class TaskConstants { public static final String TASK_TYPE_SUB_PROCESS = "SUB_PROCESS"; + public static final String TASK_TYPE_DYNAMIC = "DYNAMIC"; + public static final String TASK_TYPE_DEPENDENT = "DEPENDENT"; public static final String TASK_TYPE_SQL = "SQL"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java index d16121f150..dff762300e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -102,6 +103,8 @@ public class TaskPluginManager { return JSONUtils.parseObject(parametersNode.getTaskParams(), DependentParameters.class); case TaskConstants.TASK_TYPE_BLOCKING: return JSONUtils.parseObject(parametersNode.getTaskParams(), BlockingParameters.class); + case TaskConstants.TASK_TYPE_DYNAMIC: + return JSONUtils.parseObject(parametersNode.getTaskParams(), DynamicParameters.class); default: TaskChannel taskChannel = this.getTaskChannelMap().get(taskType); if (Objects.isNull(taskChannel)) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java new file mode 100644 index 0000000000..7582a23656 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.plugin.task.api.model; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; + +@Data +@NoArgsConstructor +public class DynamicInputParameter { + + @NonNull + private String name; + @NonNull + private String value; + private String separator = ","; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java new file mode 100644 index 0000000000..e918db3b18 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.parameters; + +import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter; + +import java.util.List; + +import lombok.Data; + +@Data +public class DynamicParameters extends AbstractParameters { + + /** + * process definition id + */ + private long processDefinitionCode; + + private int maxNumOfSubWorkflowInstances; + + private int degreeOfParallelism; + + private String filterCondition; + + private List<DynamicInputParameter> listParameters; + + @Override + public boolean checkParameters() { + try { + if (listParameters == null || listParameters.isEmpty()) { + return false; + } + } catch (Exception e) { + return false; + } + return this.processDefinitionCode != 0; + } +} diff --git a/dolphinscheduler-ui/public/images/task-icons/dynamic.png b/dolphinscheduler-ui/public/images/task-icons/dynamic.png new file mode 100644 index 0000000000..6df7485872 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dynamic.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png b/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png new file mode 100644 index 0000000000..b8b43135cc Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png differ diff --git a/dolphinscheduler-ui/src/common/common.ts b/dolphinscheduler-ui/src/common/common.ts index aa98186c5d..1e2892064e 100644 --- a/dolphinscheduler-ui/src/common/common.ts +++ b/dolphinscheduler-ui/src/common/common.ts @@ -30,7 +30,8 @@ import { StopOutlined, IssuesCloseOutlined, SendOutlined, - HistoryOutlined + HistoryOutlined, + HourglassOutlined } from '@vicons/antd' import { format, parseISO } from 'date-fns' import _ from 'lodash' @@ -123,6 +124,10 @@ export const runningType = (t: any) => [ { desc: `${t('project.workflow.execute_task')}`, code: 'EXECUTE_TASK' + }, + { + desc: `${t('project.workflow.dynamic_generation')}`, + code: 'DYNAMIC_GENERATION' } ] @@ -378,7 +383,15 @@ export const workflowExecutionState = (t: any): IWorkflowExecutionStateConfig => icon: HistoryOutlined, isSpin: false, classNames: 'pending' - } + }, + WAIT_TO_RUN: { + id: 18, + desc: `${t('project.overview.wait_to_run')}`, + color: '#5102ce', + icon: HourglassOutlined, + isSpin: false, + classNames: 'wait_to_run' + }, }) /** diff --git a/dolphinscheduler-ui/src/common/types.ts b/dolphinscheduler-ui/src/common/types.ts index a07a13c005..ec40e638b5 100644 --- a/dolphinscheduler-ui/src/common/types.ts +++ b/dolphinscheduler-ui/src/common/types.ts @@ -41,6 +41,7 @@ export type IWorkflowExecutionState = | 'SERIAL_WAIT' | 'READY_BLOCK' | 'BLOCK' + | 'WAIT_TO_RUN' export type ITaskStateConfig = { [key in ITaskState]: { diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index f6e072960e..dff1a0c18b 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -168,6 +168,7 @@ export default { recovery_waiting_thread: 'Recovery waiting thread', recover_serial_wait: 'Recover serial wait', execute_task: 'Execute the specified task', + dynamic_generation: 'Dynamic Generation', recovery_suspend: 'Recovery Suspend', recovery_failed: 'Recovery Failed', gantt: 'Gantt', @@ -832,7 +833,16 @@ export default { dependent_failure_policy_waiting: 'waiting', dependent_failure_waiting_time: 'Dependent failure waiting time', dependent_failure_waiting_time_tips: - 'Failure waiting time must be a positive integer' + 'Failure waiting time must be a positive integer', + max_num_of_sub_workflow_instances: '动态生成实例上线', + filter_condition: 'Filter Condition', + params_value: 'Params Value', + separator: 'Separator', + dynamic_name_tips: 'name(required)', + dynamic_value_tips: 'params or value(required)', + dynamic_separator_tips: 'separator(required)', + child_node_definition: 'child node definition', + child_node_instance: 'child node instance', }, menu: { fav: 'Favorites', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index ccb68b80aa..e469d0fce8 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -169,6 +169,7 @@ export default { recovery_waiting_thread: '恢复等待线程', recover_serial_wait: '串行恢复', execute_task: '执行指定任务', + dynamic_generation: '动态生成', recovery_suspend: '恢复运行', recovery_failed: '重跑失败任务', gantt: '甘特图', @@ -808,7 +809,16 @@ export default { dependent_failure_policy_failure: '失败', dependent_failure_policy_waiting: '等待', dependent_failure_waiting_time: '依赖失败等待时间', - dependent_failure_waiting_time_tips: '失败等待时间必须为正整数' + dependent_failure_waiting_time_tips: '失败等待时间必须为正整数', + max_num_of_sub_workflow_instances: '动态生成实例上线', + filter_condition: '过滤条件', + params_value: '取值参数', + separator: '分隔符', + dynamic_name_tips: 'name(必填)', + dynamic_value_tips: 'params or value(必填)', + dynamic_separator_tips: '分隔符(必填)', + child_node_definition: '子节点定义', + child_node_instance: '子节点实例', }, menu: { fav: '收藏组件', diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts b/dolphinscheduler-ui/src/store/project/task-type.ts index 993cf5a1f0..4583335a1a 100644 --- a/dolphinscheduler-ui/src/store/project/task-type.ts +++ b/dolphinscheduler-ui/src/store/project/task-type.ts @@ -32,6 +32,9 @@ export const TASK_TYPES_MAP = { SUB_PROCESS: { alias: 'SUB_PROCESS' }, + DYNAMIC: { + alias: 'DYNAMIC' + }, PROCEDURE: { alias: 'PROCEDURE' }, diff --git a/dolphinscheduler-ui/src/store/project/types.ts b/dolphinscheduler-ui/src/store/project/types.ts index 7c2136f08c..e486bd9c5c 100644 --- a/dolphinscheduler-ui/src/store/project/types.ts +++ b/dolphinscheduler-ui/src/store/project/types.ts @@ -23,6 +23,7 @@ type TaskExecuteType = 'STREAM' | 'BATCH' type TaskType = | 'SHELL' | 'SUB_PROCESS' + | 'DYNAMIC' | 'PROCEDURE' | 'SQL' | 'SPARK' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx b/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx index a76515d134..390968b2c5 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx @@ -158,7 +158,7 @@ const NodeDetailModal = defineComponent({ }, { text: t('project.node.enter_this_child_node'), - show: props.data.taskType === 'SUB_PROCESS', + show: props.data.taskType === 'SUB_PROCESS' || props.data.taskType === 'DYNAMIC', disabled: !props.data.id || (router.currentRoute.value.name === 'workflow-instance-detail' && diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts index 0015118fa2..0245e55dcf 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts @@ -88,3 +88,4 @@ export { useKubeflow } from './use-kubeflow' export { useLinkis } from './use-linkis' export { useDataFactory } from './use-data-factory' export { useRemoteShell } from './use-remote-shell' +export { useDynamic } from './use-dynamic' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts new file mode 100644 index 0000000000..08e47c2d98 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type { IJsonItem } from '../types' +import { useI18n } from 'vue-i18n' + +export function useDynamic(model: { [field: string]: any }): IJsonItem[] { + const { t } = useI18n() + + return [ + { + type: 'input-number', + field: 'maxNumOfSubWorkflowInstances', + span: 12, + name: t('project.node.max_num_of_sub_workflow_instances'), + validate: { + required: true + } + }, + { + type: 'input-number', + field: 'degreeOfParallelism', + span: 12, + name: t('project.node.parallelism'), + validate: { + required: true + } + }, + { + type: 'custom-parameters', + field: 'listParameters', + name: t('project.node.params_value'), + span: 24, + children: [ + { + type: 'input', + field: 'name', + span: 8, + props: { + placeholder: t('project.node.dynamic_name_tips'), + maxLength: 256 + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.dynamic_name_tips')) + } + + const sameItems = model['listParameters'].filter( + (item: { name: string }) => item.name === value + ) + + if (sameItems.length > 1) { + return new Error(t('project.node.prop_repeat')) + } + } + } + }, + { + type: 'input', + field: 'value', + span: 8, + props: { + placeholder: t('project.node.dynamic_value_tips'), + maxLength: 256 + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.dynamic_value_tips')) + } + } + } + }, + { + type: 'input', + field: 'separator', + span: 4, + props: { + placeholder: t('project.node.dynamic_separator_tips'), + maxLength: 256 + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.dynamic_separator_tips')) + } + } + } + } + ] + }, + { + type: 'input', + field: 'filterCondition', + span: 24, + name: t('project.node.filter_condition') + } + ] +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 8ee6140b4c..efc01d975e 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -32,7 +32,7 @@ export function formatParams(data: INodeData): { taskDefinitionJsonObj: object } { const taskParams: ITaskParams = {} - if (data.taskType === 'SUB_PROCESS') { + if (data.taskType === 'SUB_PROCESS' || data.taskType === 'DYNAMIC') { taskParams.processDefinitionCode = data.processDefinitionCode } @@ -480,6 +480,14 @@ export function formatParams(data: INodeData): { taskParams.datasource = data.datasource } + if (data.taskType === 'DYNAMIC') { + taskParams.processDefinitionCode = data.processDefinitionCode + taskParams.maxNumOfSubWorkflowInstances = data.maxNumOfSubWorkflowInstances + taskParams.degreeOfParallelism = data.degreeOfParallelism + taskParams.filterCondition = data.filterCondition + taskParams.listParameters = data.listParameters + } + let timeoutNotifyStrategy = '' if (data.timeoutNotifyStrategy) { if (data.timeoutNotifyStrategy.length === 1) { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts index 0372421357..2d489feced 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts @@ -52,10 +52,13 @@ import { useKubeflow } from './use-kubeflow' import { useLinkis } from './use-linkis' import { useDataFactory } from './use-data-factory' import { useRemoteShell } from './use-remote-shell' +// @ts-ignore +import { useDynamic } from './use-dynamic' export default { SHELL: useShell, SUB_PROCESS: useSubProcess, + DYNAMIC: useDynamic, PYTHON: usePython, SPARK: useSpark, MR: useMr, diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts new file mode 100644 index 0000000000..b078e9f85d --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { reactive } from 'vue' +import { useRouter } from 'vue-router' +import * as Fields from '../fields/index' +import type { IJsonItem, INodeData } from '../types' +import { ITaskData } from '../types' + +export function useDynamic({ + projectCode, + from = 0, + readonly, + data +}: { + projectCode: number + from?: number + readonly?: boolean + data?: ITaskData +}) { + const router = useRouter() + const workflowCode = router.currentRoute.value.params.code + const model = reactive({ + taskType: 'DYNAMIC', + name: '', + flag: 'YES', + description: '', + timeoutFlag: false, + localParams: [], + environmentCode: null, + failRetryInterval: 1, + failRetryTimes: 0, + workerGroup: 'default', + delayTime: 0, + timeout: 30, + maxNumOfSubWorkflowInstances: 1024, + degreeOfParallelism: 1, + filterCondition: '', + listParameters: [{ name: null, value: null, separator: ',' }] + } as INodeData) + + let extra: IJsonItem[] = [] + if (from === 1) { + extra = [ + Fields.useTaskType(model, readonly), + Fields.useProcessName({ + model, + projectCode, + isCreate: !data?.id, + from, + processName: data?.processName + }) + ] + } + if (model.listParameters?.length) { + model.listParameters[0].disabled = true + } + + return { + json: [ + Fields.useName(from), + ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }), + Fields.useRunFlag(), + Fields.useDescription(), + Fields.useTaskPriority(), + Fields.useWorkerGroup(), + Fields.useEnvironmentName(model, !data?.id), + ...Fields.useTaskGroup(model, projectCode), + ...Fields.useTimeoutAlarm(model), + Fields.useChildNode({ + model, + projectCode, + from, + processName: data?.processName, + code: from === 1 ? 0 : Number(workflowCode) + }), + ...Fields.useDynamic(model), + Fields.usePreTasks() + ] as IJsonItem[], + model + } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index e65045b140..661c43776c 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -429,6 +429,10 @@ interface ITaskParams { factoryName?: string resourceGroupName?: string pipelineName?: string + maxNumOfSubWorkflowInstances?: number + degreeOfParallelism?: number + filterCondition?: string + listParameters?: Array<any> } interface INodeData diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts index 03d523cdaf..9d6e26b640 100644 --- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts @@ -17,6 +17,7 @@ export type TaskType = | 'SHELL' | 'SUB_PROCESS' + | 'DYNAMIC' | 'PROCEDURE' | 'SQL' | 'SPARK' @@ -65,6 +66,9 @@ export const TASK_TYPES_MAP = { SUB_PROCESS: { alias: 'SUB_PROCESS' }, + DYNAMIC: { + alias: 'DYNAMIC' + }, PROCEDURE: { alias: 'PROCEDURE' }, diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss index b9a52da161..9bcb2fd7e9 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss @@ -107,6 +107,9 @@ $bgLight: #ffffff; &.icon-sub_process { background-image: url('/images/task-icons/sub_process.png'); } + &.icon-dynamic { + background-image: url('/images/task-icons/dynamic.png'); + } &.icon-data_quality { background-image: url('/images/task-icons/data_quality.png'); } @@ -220,6 +223,9 @@ $bgLight: #ffffff; &.icon-sub_process { background-image: url('/images/task-icons/sub_process_hover.png'); } + &.icon-dynamic { + background-image: url('/images/task-icons/dynamic_hover.png'); + } &.icon-data_quality { background-image: url('/images/task-icons/data_quality_hover.png'); }
