This is an automated email from the ASF dual-hosted git repository.

zhoujieguang pushed a commit to branch feat/task-plugin-dynamic
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 784155ce845ba800e14f57836adaeebedfa7a3c0
Author: JieguangZhou <[email protected]>
AuthorDate: Tue May 23 18:24:51 2023 +0800

    add dynamic task plugin
---
 .../api/controller/ProcessInstanceController.java  |  23 ++
 .../api/dto/DynamicSubWorkflowDto.java             |  24 ++
 .../apache/dolphinscheduler/api/enums/Status.java  |   2 +
 .../api/service/ProcessInstanceService.java        |   4 +
 .../service/impl/ProcessInstanceServiceImpl.java   |  68 +++++
 .../src/main/resources/task-type-config.yaml       |   1 +
 .../common/constants/CommandKeyConstants.java      |   2 +
 .../dolphinscheduler/common/enums/CommandType.java |   1 +
 .../common/enums/WorkflowExecutionStatus.java      |   1 +
 .../utils/placeholder/BusinessTimeUtils.java       |   1 +
 .../dao/entity/RelationSubWorkflow.java            |  50 ++++
 .../dolphinscheduler/dao/entity/TaskInstance.java  |   5 +
 .../dao/mapper/RelationSubWorkflowMapper.java      |  40 +++
 .../dao/repository/ProcessInstanceDao.java         |   2 +
 .../repository/impl/ProcessInstanceDaoImpl.java    |   9 +
 .../dao/mapper/RelationSubWorkflowMapper.xml       |  44 ++++
 .../main/resources/sql/dolphinscheduler_mysql.sql  |  13 +
 .../master/event/TaskTimeoutStateEventHandler.java |   3 +-
 .../master/runner/WorkflowExecuteRunnable.java     |  24 +-
 .../MasterTaskExecuteRunnableFactoryBuilder.java   |   4 +-
 .../dynamic/DynamicAsyncTaskExecuteFunction.java   | 178 +++++++++++++
 .../runner/task/dynamic/DynamicCommandUtils.java   |  68 +++++
 .../runner/task/dynamic/DynamicLogicTask.java      | 290 +++++++++++++++++++++
 .../dynamic/DynamicLogicTaskPluginFactory.java     |  72 +++++
 .../master/runner/task/dynamic/DynamicOutput.java  |  16 ++
 .../task/subworkflow/SubWorkflowLogicTask.java     |   4 +-
 .../server/master/utils/TaskUtils.java             |   4 +-
 .../service/process/ProcessService.java            |   3 +
 .../service/process/ProcessServiceImpl.java        |   6 +-
 .../service/subworkflow/SubWorkflowService.java    |  29 +++
 .../subworkflow/SubWorkflowServiceImpl.java        | 102 ++++++++
 .../plugin/task/api/TaskConstants.java             |   2 +
 .../plugin/task/api/TaskPluginManager.java         |   3 +
 .../task/api/model/DynamicInputParameter.java      |  16 ++
 .../task/api/parameters/DynamicParameters.java     |  53 ++++
 .../public/images/task-icons/dynamic.png           | Bin 0 -> 10606 bytes
 .../public/images/task-icons/dynamic_hover.png     | Bin 0 -> 11229 bytes
 dolphinscheduler-ui/src/common/common.ts           |  17 +-
 dolphinscheduler-ui/src/common/types.ts            |   1 +
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  12 +-
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  12 +-
 dolphinscheduler-ui/src/store/project/task-type.ts |   3 +
 dolphinscheduler-ui/src/store/project/types.ts     |   1 +
 .../projects/task/components/node/detail-modal.tsx |   2 +-
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-dynamic.ts     | 120 +++++++++
 .../projects/task/components/node/format-data.ts   |  10 +-
 .../projects/task/components/node/tasks/index.ts   |   3 +
 .../task/components/node/tasks/use-dynamic.ts      |  96 +++++++
 .../views/projects/task/components/node/types.ts   |   4 +
 .../src/views/projects/task/constants/task-type.ts |   4 +
 .../workflow/components/dag/dag.module.scss        |   6 +
 52 files changed, 1444 insertions(+), 15 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index b64f51a70b..947cb05951 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller;
 import static 
org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
 
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
@@ -319,6 +320,28 @@ public class ProcessInstanceController extends 
BaseController {
         return returnDataList(result);
     }
 
+    /**
+     * query dynamic sub process instance detail info by task id
+     *
+     * @param loginUser login user
+     * @param taskId task id
+     * @return sub process instance detail
+     */
+    @Operation(summary = "queryDynamicSubWorkflowInstances", description = 
"QUERY_DYNAMIC_SUBPROCESS_INSTANCE_BY_TASK_CODE_NOTES")
+    @Parameters({
+            @Parameter(name = "taskId", description = "taskInstanceId", 
required = true, schema = @Schema(implementation = int.class, example = "100"))
+    })
+    @GetMapping(value = "/query-dynamic-sub-workflows")
+    @ResponseStatus(HttpStatus.OK)
+    
@ApiException(Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result<List<DynamicSubWorkflowDto>> 
queryDynamicSubWorkflowInstances(@Parameter(hidden = true) 
@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                               
 @RequestParam("taskId") Integer taskId) {
+        List<DynamicSubWorkflowDto> dynamicSubWorkflowDtos =
+                
processInstanceService.queryDynamicSubWorkflowInstances(loginUser, taskId);
+        return new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg(), 
dynamicSubWorkflowDtos);
+    }
+
     /**
      * query process instance global variables and local variables
      *
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java
new file mode 100644
index 0000000000..b688aa4066
--- /dev/null
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java
@@ -0,0 +1,24 @@
+package org.apache.dolphinscheduler.api.dto;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+
+import java.util.Map;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class DynamicSubWorkflowDto {
+
+    private long processInstanceId;
+
+    private String name;
+
+    private long index;
+
+    private Map<String, String> parameters;
+
+    private WorkflowExecutionStatus state;
+
+}
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index d29235439d..4ad45709cf 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -271,6 +271,8 @@ public enum Status {
     NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"),
     STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair", 
"状态码前后不一致或状态码和code不匹配"),
 
+    TASK_INSTANCE_NOT_DYNAMIC_TASK(10213, "task instance {0} is not dynamic", 
"任务实例[{0}]不是Dynamic类型"),
+
     UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
     UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
     RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 304d0139e2..ccb3f98b4d 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -18,6 +18,7 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
 import 
org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -135,6 +136,9 @@ public interface ProcessInstanceService {
                                                         long projectCode,
                                                         Integer taskId);
 
+    List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User 
loginUser,
+                                                                 Integer 
taskId);
+
     /**
      * update process instance
      *
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 26606caa3b..da165db9a0 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -30,6 +30,7 @@ import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
 
 import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
+import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
 import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
 import org.apache.dolphinscheduler.api.dto.gantt.Task;
 import 
org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
@@ -44,6 +45,7 @@ import 
org.apache.dolphinscheduler.api.service.TaskInstanceService;
 import org.apache.dolphinscheduler.api.service.UsersService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
@@ -57,6 +59,7 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
 import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
 import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@@ -66,6 +69,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
@@ -93,6 +97,7 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -184,6 +189,9 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
     @Autowired
     private ScheduleMapper scheduleMapper;
 
+    @Autowired
+    private RelationSubWorkflowMapper relationSubWorkflowMapper;
+
     @Autowired
     private AlertDao alertDao;
 
@@ -488,6 +496,66 @@ public class ProcessInstanceServiceImpl extends 
BaseServiceImpl implements Proce
         return result;
     }
 
+    @Override
+    public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User 
loginUser, Integer taskId) {
+        TaskInstance taskInstance = 
taskInstanceDao.findTaskInstanceById(taskId);
+        Map<String, Object> result = new HashMap<>();
+        if (taskInstance == null) {
+            putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+            throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, 
taskId);
+        }
+
+        TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByCode(taskInstance.getTaskCode());
+        if (taskDefinition == null) {
+            putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+            throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, 
taskId);
+        }
+
+        if (!taskInstance.isDynamic()) {
+            putMsg(result, Status.TASK_INSTANCE_NOT_DYNAMIC_TASK, 
taskInstance.getName());
+            throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, 
taskId);
+        }
+        List<RelationSubWorkflow> relationSubWorkflows = 
relationSubWorkflowMapper
+                
.selectAllSubProcessInstance(Long.valueOf(taskInstance.getProcessInstanceId()),
+                        taskInstance.getTaskCode());
+        List<Long> allSubProcessInstanceId = relationSubWorkflows.stream()
+                
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList());
+        List<ProcessInstance> allSubWorkflows = 
processInstanceDao.selectBatchIds(allSubProcessInstanceId);
+
+        if (allSubWorkflows == null || allSubWorkflows.isEmpty()) {
+            putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
+            throw new ServiceException(Status.SUB_PROCESS_INSTANCE_NOT_EXIST, 
taskId);
+        }
+        Long subWorkflowCode = 
allSubWorkflows.get(0).getProcessDefinitionCode();
+        int subWorkflowVersion = 
allSubWorkflows.get(0).getProcessDefinitionVersion();
+        ProcessDefinition subProcessDefinition =
+                processService.findProcessDefinition(subWorkflowCode, 
subWorkflowVersion);
+        if (subProcessDefinition == null) {
+            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, subWorkflowCode);
+            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, 
subWorkflowCode);
+        }
+
+        allSubWorkflows.sort(Comparator.comparing(ProcessInstance::getId));
+
+        List<DynamicSubWorkflowDto> allDynamicSubWorkflowDtos = new 
ArrayList<>();
+        int index = 1;
+        for (ProcessInstance processInstance : allSubWorkflows) {
+            DynamicSubWorkflowDto dynamicSubWorkflowDto = new 
DynamicSubWorkflowDto();
+            
dynamicSubWorkflowDto.setProcessInstanceId(processInstance.getId());
+            dynamicSubWorkflowDto.setIndex(index);
+            dynamicSubWorkflowDto.setState(processInstance.getState());
+            dynamicSubWorkflowDto.setName(subProcessDefinition.getName());
+            Map<String, String> commandParamMap = 
JSONUtils.toMap(processInstance.getCommandParam());
+            String parameter = 
commandParamMap.get(CommandKeyConstants.CMD_DYNAMIC_START_PARAMS);
+            dynamicSubWorkflowDto.setParameters(JSONUtils.toMap(parameter));
+            allDynamicSubWorkflowDtos.add(dynamicSubWorkflowDto);
+            index++;
+
+        }
+
+        return allDynamicSubWorkflowDtos;
+    }
+
     /**
      * add dependent result for dependent task
      */
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml 
b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
index 7a21c36946..9105d50697 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
@@ -40,6 +40,7 @@ task:
     - 'DEPENDENT'
     - 'CONDITIONS'
     - 'SWITCH'
+    - 'DYNAMIC'
   dataIntegration:
     - 'SEATUNNEL'
     - 'DATAX'
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
index c1f801257a..68dfdd3a6f 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
@@ -45,6 +45,8 @@ public class CommandKeyConstants {
 
     public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
 
+    public static final String CMD_DYNAMIC_START_PARAMS = "dynamicParams";
+
     /**
      * complement data start date
      */
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 7aa2b881b3..27a2dc7ac3 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -56,6 +56,7 @@ public enum CommandType {
     RECOVER_WAITING_THREAD(10, "recover waiting thread"),
     RECOVER_SERIAL_WAIT(11, "recover serial wait"),
     EXECUTE_TASK(12, "start a task node in a process instance"),
+    DYNAMIC_GENERATION(13, "dynamic generation"),
     ;
 
     CommandType(int code, String descp) {
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
index 4fced36110..e8ec2dbc08 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
@@ -40,6 +40,7 @@ public enum WorkflowExecutionStatus {
     SERIAL_WAIT(14, "serial wait"),
     READY_BLOCK(15, "ready block"),
     BLOCK(16, "block"),
+    WAIT_TO_RUN(17, "wait to run"),
     ;
 
     private static final Map<Integer, WorkflowExecutionStatus> CODE_MAP = new 
HashMap<>();
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
index 9d30aa71ca..6d6e5497d4 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
@@ -59,6 +59,7 @@ public class BusinessTimeUtils {
             case RECOVER_SUSPENDED_PROCESS:
             case START_FAILURE_TASK_PROCESS:
             case REPEAT_RUNNING:
+            case DYNAMIC_GENERATION:
             case SCHEDULER:
             default:
                 businessDate = addDays(new Date(), -1);
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java
new file mode 100644
index 0000000000..bc19a9d130
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+import lombok.Data;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@Data
+@TableName("t_ds_relation_sub_workflow")
+public class RelationSubWorkflow {
+
+    /**
+     * id
+     */
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    /**
+     * parent process instance id
+     */
+    private Long parentWorkflowInstanceId;
+
+    /**
+     * parent task instance id
+     */
+    private Long parentTaskCode;
+
+    /**
+     * process instance id
+     */
+    private Long subWorkflowInstanceId;
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index cf5eb62ec7..0a0b4876e4 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -21,6 +21,7 @@ import static 
org.apache.dolphinscheduler.common.constants.Constants.SEC_2_MINUT
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DYNAMIC;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
 
@@ -357,6 +358,10 @@ public class TaskInstance implements Serializable {
         return TASK_TYPE_DEPENDENT.equalsIgnoreCase(this.taskType);
     }
 
+    public boolean isDynamic() {
+        return TASK_TYPE_DYNAMIC.equalsIgnoreCase(this.taskType);
+    }
+
     public boolean isConditionsTask() {
         return TASK_TYPE_CONDITIONS.equalsIgnoreCase(this.taskType);
     }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java
new file mode 100644
index 0000000000..8a3f0d350d
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ * process instance map mapper interface
+ */
+public interface RelationSubWorkflowMapper extends 
BaseMapper<RelationSubWorkflow> {
+
+    int batchInsert(@Param("relationSubWorkflows") List<RelationSubWorkflow> 
relationSubWorkflows);
+
+    List<RelationSubWorkflow> 
selectAllSubProcessInstance(@Param("parentWorkflowInstanceId") Long 
parentWorkflowInstanceId,
+                                                          
@Param("parentTaskCode") Long parentTaskCode);
+
+    RelationSubWorkflow 
queryParentWorkflowInstance(@Param("subWorkflowInstanceId") Long 
subWorkflowInstanceId);
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index 0144356dfe..720c46d898 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -38,6 +38,8 @@ public interface ProcessInstanceDao {
      */
     public int upsertProcessInstance(ProcessInstance processInstance);
 
+    List<ProcessInstance> selectBatchIds(List<Long> processInstanceIds);
+
     void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
 
     void deleteById(Integer workflowInstanceId);
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 5117145fd4..8a8402f686 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -27,6 +27,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 
 import org.apache.commons.collections4.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -65,6 +66,14 @@ public class ProcessInstanceDaoImpl implements 
ProcessInstanceDao {
         }
     }
 
+    @Override
+    public List<ProcessInstance> selectBatchIds(List<Long> processInstanceIds) 
{
+        if (CollectionUtils.isEmpty(processInstanceIds)) {
+            return new ArrayList<>();
+        }
+        return processInstanceMapper.selectBatchIds(processInstanceIds);
+    }
+
     @Override
     public void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds) {
         if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) {
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml
new file mode 100644
index 0000000000..4f3687b212
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
+<mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper">
+    <sql id="baseSql">
+        id, parent_workflow_instance_id, parent_task_code, 
sub_workflow_instance_id
+    </sql>
+    <insert id="batchInsert">
+        insert into t_ds_relation_sub_workflow (parent_workflow_instance_id, 
parent_task_code, sub_workflow_instance_id)
+        values
+        <foreach collection="relationSubWorkflows" item="relationSubWorkflow" 
separator=",">
+            (#{relationSubWorkflow.parentWorkflowInstanceId}, 
#{relationSubWorkflow.parentTaskCode}, 
#{relationSubWorkflow.subWorkflowInstanceId})
+        </foreach>
+    </insert>
+    <select id="selectAllSubProcessInstance" 
resultType="org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow">
+        select
+        id, parent_workflow_instance_id, parent_task_code, 
sub_workflow_instance_id
+        FROM t_ds_relation_sub_workflow
+        WHERE parent_workflow_instance_id = #{parentWorkflowInstanceId}
+            AND parent_task_code = #{parentTaskCode}
+    </select>
+    <select id="queryParentWorkflowInstance" 
resultType="org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow">
+        select
+            id, parent_workflow_instance_id, parent_task_code, 
sub_workflow_instance_id
+        FROM t_ds_relation_sub_workflow
+        WHERE sub_workflow_instance_id = #{subWorkflowInstanceId}
+    </select>
+</mapper>
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index d45305336b..70ec0ab255 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -2016,3 +2016,16 @@ CREATE TABLE `t_ds_trigger_relation` (
     KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`),
     UNIQUE KEY `t_ds_trigger_relation_UN` 
(`trigger_type`,`job_id`,`trigger_code`)
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
+
+
+DROP TABLE IF EXISTS `t_ds_relation_sub_workflow`;
+CREATE TABLE `t_ds_relation_sub_workflow` (
+                                              `id` bigint NOT NULL 
AUTO_INCREMENT COMMENT 'key',
+                                              `parent_workflow_instance_id` 
bigint  NOT NULL COMMENT 'parent process instance id',
+                                              `parent_task_code` bigint  NOT 
NULL COMMENT 'parent process instance id',
+                                              `sub_workflow_instance_id` 
bigint  NOT NULL COMMENT 'child process instance id',
+                                              PRIMARY KEY (`id`),
+                                              KEY 
`idx_parent_workflow_instance_id` (`parent_workflow_instance_id`),
+                                              KEY `idx_parent_task_code` 
(`parent_task_code`),
+                                              KEY 
`idx_sub_workflow_instance_id` (`sub_workflow_instance_id`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE utf8_bin;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index e01955d81a..16476ad7f7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -37,7 +37,7 @@ public class TaskTimeoutStateEventHandler implements 
StateEventHandler {
 
     @Override
     public boolean handleStateEvent(WorkflowExecuteRunnable 
workflowExecuteRunnable,
-                                    StateEvent stateEvent) throws 
StateEventHandleError {
+                                    StateEvent stateEvent) throws 
StateEventHandleError, StateEventHandleException {
         TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
 
         TaskMetrics.incTaskInstanceByState("timeout");
@@ -62,6 +62,7 @@ public class TaskTimeoutStateEventHandler implements 
StateEventHandler {
                 || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) {
             if 
(taskExecuteRunnableMap.containsKey(taskInstance.getTaskCode())) {
                 
taskExecuteRunnableMap.get(taskInstance.getTaskCode()).timeout();
+                workflowExecuteRunnable.taskFinished(taskInstance);
             } else {
                 log.warn(
                         "cannot find the task processor for task {}, so skip 
task processor action.",
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7f77a5c99f..e8cdeb9069 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -446,8 +446,7 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
 
             if (taskInstance.getState().isSuccess()) {
                 completeTaskMap.put(taskInstance.getTaskCode(), 
taskInstance.getId());
-                // todo: merge the last taskInstance
-                processInstance.setVarPool(taskInstance.getVarPool());
+                mergeTaskInstanceVarPool(taskInstance);
                 processInstanceDao.upsertProcessInstance(processInstance);
                 // save the cacheKey only if the task is defined as cache task 
and the task is success
                 if (taskInstance.getIsCache().equals(Flag.YES)) {
@@ -2203,4 +2202,25 @@ public class WorkflowExecuteRunnable implements 
Callable<WorkflowSubmitStatus> {
     private boolean isExecutedOnMaster(String host) {
         return host.endsWith(masterAddress.split(Constants.COLON)[1]);
     }
+
+    private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
+        String taskVarPoolJson = taskInstance.getVarPool();
+        if (StringUtils.isEmpty(taskVarPoolJson)) {
+            return;
+        }
+        String processVarPoolJson = processInstance.getVarPool();
+        if (StringUtils.isEmpty(processVarPoolJson)) {
+            processInstance.setVarPool(taskVarPoolJson);
+            return;
+        }
+        List<Property> processVarPool = new 
ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class));
+        List<Property> taskVarPool = JSONUtils.toList(taskVarPoolJson, 
Property.class);
+        Set<String> newProcessVarPoolKeys = 
taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet());
+        processVarPool = processVarPool.stream().filter(property -> 
!newProcessVarPoolKeys.contains(property.getProp()))
+                .collect(Collectors.toList());
+
+        processVarPool.addAll(taskVarPool);
+
+        processInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
index f196d85153..c5689f6a1b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.runner.execute;
 
 import 
org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask;
+import 
org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask;
 import 
org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask;
 
 import java.util.Set;
@@ -38,7 +39,8 @@ public class MasterTaskExecuteRunnableFactoryBuilder {
 
     private static final Set<String> ASYNC_TASK_TYPE = Sets.newHashSet(
             DependentLogicTask.TASK_TYPE,
-            SubWorkflowLogicTask.TASK_TYPE);
+            SubWorkflowLogicTask.TASK_TYPE,
+            DynamicLogicTask.TASK_TYPE);
 
     public MasterDelayTaskExecuteRunnableFactory<? extends 
MasterDelayTaskExecuteRunnable> 
createWorkerDelayTaskExecuteRunnableFactory(String taskType) {
         if (ASYNC_TASK_TYPE.contains(taskType)) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java
new file mode 100644
index 0000000000..b433b6f3bc
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_DYNAMIC_START_PARAMS;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
+import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DynamicAsyncTaskExecuteFunction implements 
AsyncTaskExecuteFunction {
+
+    private static final Duration TASK_EXECUTE_STATE_CHECK_INTERVAL = 
Duration.ofSeconds(10);
+
+    private static final String OUTPUT_KEY = "dynamic.out";
+
+    private final ProcessInstance processInstance;
+
+    private final TaskInstance taskInstance;
+
+    private final SubWorkflowService subWorkflowService;
+
+    private final CommandMapper commandMapper;
+
+    private final int degreeOfParallelism;
+
+    private final DynamicLogicTask logicTask;
+
+    public DynamicAsyncTaskExecuteFunction(TaskExecutionContext 
taskExecutionContext,
+                                           ProcessInstance processInstance,
+                                           TaskInstance taskInstance,
+                                           DynamicLogicTask dynamicLogicTask,
+                                           CommandMapper commandMapper,
+                                           SubWorkflowService 
subWorkflowService,
+                                           int degreeOfParallelism) {
+        this.processInstance = processInstance;
+        this.taskInstance = taskInstance;
+        this.logicTask = dynamicLogicTask;
+        this.degreeOfParallelism = degreeOfParallelism;
+
+        this.commandMapper = commandMapper;
+        this.subWorkflowService = subWorkflowService;
+    }
+
+    @Override
+    public @NonNull AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() {
+        List<ProcessInstance> allSubProcessInstance = 
getAllSubProcessInstance();
+        int totalSubProcessInstanceCount = allSubProcessInstance.size();
+
+        List<ProcessInstance> finishedSubProcessInstance =
+                
subWorkflowService.filterFinishProcessInstances(allSubProcessInstance);
+
+        if (finishedSubProcessInstance.size() == totalSubProcessInstanceCount) 
{
+            log.info("all sub process instance finish");
+            int successCount = 
subWorkflowService.filterSuccessProcessInstances(finishedSubProcessInstance).size();
+            log.info("success sub process instance count: {}", successCount);
+            if (successCount == totalSubProcessInstanceCount) {
+                log.info("all sub process instance success");
+                setOutputParameters();
+                return AsyncTaskExecutionStatus.SUCCESS;
+            } else {
+                int failedCount = totalSubProcessInstanceCount - successCount;
+                log.info("failed sub process instance count: {}", failedCount);
+                return AsyncTaskExecutionStatus.FAILED;
+            }
+        }
+
+        if (logicTask.isCancel()) {
+            return AsyncTaskExecutionStatus.FAILED;
+        }
+
+        int runningCount = 
subWorkflowService.filterRunningProcessInstances(allSubProcessInstance).size();
+        int startCount = degreeOfParallelism - runningCount;
+        if (startCount > 0) {
+            log.info("There are {} sub process instances that can be started", 
startCount);
+            startSubProcessInstances(allSubProcessInstance, startCount);
+        }
+        // query the status of sub workflow instance
+        return AsyncTaskExecutionStatus.RUNNING;
+    }
+
+    private void setOutputParameters() {
+        log.info("set varPool");
+        List<ProcessInstance> allSubProcessInstance = 
getAllSubProcessInstance();
+
+        List<DynamicOutput> dynamicOutputs = new ArrayList<>();
+        int index = 1;
+        for (ProcessInstance processInstance : allSubProcessInstance) {
+            DynamicOutput dynamicOutput = new DynamicOutput();
+            Map<String, String> dynamicParams =
+                    
JSONUtils.toMap(JSONUtils.toMap(processInstance.getCommandParam()).get(CMD_DYNAMIC_START_PARAMS));
+            dynamicOutput.setDynParams(dynamicParams);
+
+            Map<String, String> outputValueMap = new HashMap<>();
+            List<Property> propertyList = 
subWorkflowService.getWorkflowOutputParameters(processInstance);
+            for (Property property : propertyList) {
+                outputValueMap.put(property.getProp(), property.getValue());
+            }
+
+            dynamicOutput.setOutputValue(outputValueMap);
+            dynamicOutput.setMappedTimes(index++);
+            dynamicOutputs.add(dynamicOutput);
+        }
+
+        Property property = new Property();
+        property.setProp(String.format("%s(%s)", OUTPUT_KEY, 
taskInstance.getName()));
+        property.setDirect(Direct.OUT);
+        property.setType(DataType.VARCHAR);
+        property.setValue(JSONUtils.toJsonString(dynamicOutputs));
+
+        List<Property> taskPropertyList = new 
ArrayList<>(JSONUtils.toList(taskInstance.getVarPool(), Property.class));
+        taskPropertyList.add(property);
+        
logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));
+
+        log.info("set property: {}", property);
+    }
+
+    private void startSubProcessInstances(List<ProcessInstance> 
allSubProcessInstance, int startCount) {
+        List<ProcessInstance> waitingProcessInstances =
+                
subWorkflowService.filterWaitToRunProcessInstances(allSubProcessInstance);
+
+        for (int i = 0; i < Math.min(startCount, 
waitingProcessInstances.size()); i++) {
+            ProcessInstance subProcessInstance = 
waitingProcessInstances.get(i);
+            Map<String, String> parameters = 
JSONUtils.toMap(DynamicCommandUtils
+                    
.getDataFromCommandParam(subProcessInstance.getCommandParam(), 
CMD_DYNAMIC_START_PARAMS));
+            Command command = 
DynamicCommandUtils.createCommand(this.processInstance,
+                    subProcessInstance.getProcessDefinitionCode(), 
subProcessInstance.getProcessDefinitionVersion(),
+                    parameters);
+            command.setProcessInstanceId(subProcessInstance.getId());
+            commandMapper.insert(command);
+            log.info("start sub process instance, sub process instance id: {}, 
command: {}", subProcessInstance.getId(),
+                    command);
+        }
+    }
+
+    public List<ProcessInstance> getAllSubProcessInstance() {
+        return 
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), 
taskInstance.getTaskCode());
+    }
+
+    @Override
+    public @NonNull Duration getAsyncTaskStateCheckInterval() {
+        return TASK_EXECUTE_STATE_CHECK_INTERVAL;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java
new file mode 100644
index 0000000000..2acc9752eb
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java
@@ -0,0 +1,68 @@
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+public class DynamicCommandUtils {
+
+    static public Command createCommand(ProcessInstance processInstance,
+                                        Long subProcessDefinitionCode,
+                                        Integer subProcessDefinitionVersion,
+                                        Map<String, String> parameters) {
+        Command command = new Command();
+        if 
(processInstance.getCommandType().equals(CommandType.START_PROCESS)) {
+            command.setCommandType(CommandType.DYNAMIC_GENERATION);
+        } else {
+            command.setCommandType(processInstance.getCommandType());
+        }
+        command.setProcessDefinitionCode(subProcessDefinitionCode);
+        command.setProcessDefinitionVersion(subProcessDefinitionVersion);
+        command.setTaskDependType(TaskDependType.TASK_POST);
+        command.setFailureStrategy(processInstance.getFailureStrategy());
+        command.setWarningType(processInstance.getWarningType());
+
+        String globalParams = processInstance.getGlobalParams();
+        if (StringUtils.isNotEmpty(globalParams)) {
+            List<Property> parentParams = 
Lists.newArrayList(JSONUtils.toList(globalParams, Property.class));
+            for (Property parentParam : parentParams) {
+                parameters.put(parentParam.getProp(), parentParam.getValue());
+            }
+        }
+
+        addDataToCommandParam(command, 
CommandKeyConstants.CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(parameters));
+        command.setExecutorId(processInstance.getExecutorId());
+        command.setWarningGroupId(processInstance.getWarningGroupId());
+        
command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+        command.setWorkerGroup(processInstance.getWorkerGroup());
+        command.setDryRun(processInstance.getDryRun());
+        return command;
+    }
+
+    static public String getDataFromCommandParam(String commandParam, String 
key) {
+        Map<String, String> cmdParam = JSONUtils.toMap(commandParam);
+        return cmdParam.get(key);
+    }
+
+    static void addDataToCommandParam(Command command, String key, String 
data) {
+        Map<String, String> cmdParam = 
JSONUtils.toMap(command.getCommandParam());
+        if (cmdParam == null) {
+            cmdParam = new HashMap<>();
+        }
+        cmdParam.put(key, data);
+        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
new file mode 100644
index 0000000000..e0b93994ad
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
+import 
org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
+import 
org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+
+@Slf4j
+public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
+
+    public static final String TASK_TYPE = "DYNAMIC";
+    private final ProcessInstanceDao processInstanceDao;
+
+    private final SubWorkflowService subWorkflowService;
+
+    private final ProcessDefinitionMapper processDefineMapper;
+
+    private final CommandMapper commandMapper;
+
+    private final ProcessService processService;
+
+    private ProcessInstance processInstance;
+
+    private TaskInstance taskInstance;
+
+    private final MasterRpcClient masterRpcClient;
+
+    private boolean haveBeenCanceled = false;
+
+    public DynamicLogicTask(TaskExecutionContext taskExecutionContext,
+                            ProcessInstanceDao processInstanceDao,
+                            TaskInstanceDao taskInstanceDao,
+                            SubWorkflowService subWorkflowService,
+                            ProcessService processService,
+                            MasterRpcClient masterRpcClient,
+                            ProcessDefinitionMapper processDefineMapper,
+                            CommandMapper commandMapper) {
+        super(taskExecutionContext,
+                JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
new TypeReference<DynamicParameters>() {
+                }));
+        this.processInstanceDao = processInstanceDao;
+        this.subWorkflowService = subWorkflowService;
+        this.processService = processService;
+        this.masterRpcClient = masterRpcClient;
+        this.processDefineMapper = processDefineMapper;
+        this.commandMapper = commandMapper;
+
+        this.processInstance =
+                
processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId());
+        this.taskInstance = 
taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
+    }
+
+    @Override
+    public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws 
MasterTaskExecuteException {
+        List<Map<String, String>> parameterGroup = generateParameterGroup();
+
+        if (parameterGroup.size() > 
taskParameters.getMaxNumOfSubWorkflowInstances()) {
+            log.warn("the number of sub process instances [{}] exceeds the 
maximum limit [{}]", parameterGroup.size(),
+                    taskParameters.getMaxNumOfSubWorkflowInstances());
+            parameterGroup = parameterGroup.subList(0, 
taskParameters.getMaxNumOfSubWorkflowInstances());
+        }
+
+        // if already exists sub process instance, do not generate again
+        List<ProcessInstance> existsSubProcessInstanceList =
+                
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), 
taskInstance.getTaskCode());
+        if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) {
+            generateSubWorkflowInstance(parameterGroup);
+        } else {
+            resetProcessInstanceStatus(existsSubProcessInstanceList);
+        }
+        return new DynamicAsyncTaskExecuteFunction(taskExecutionContext, 
processInstance, taskInstance, this,
+                commandMapper,
+                subWorkflowService, taskParameters.getDegreeOfParallelism());
+    }
+
+    public void resetProcessInstanceStatus(List<ProcessInstance> 
existsSubProcessInstanceList) {
+        switch (processInstance.getCommandType()) {
+            case REPEAT_RUNNING:
+                existsSubProcessInstanceList.forEach(processInstance -> {
+                    
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
+                    processInstanceDao.updateProcessInstance(processInstance);
+                });
+                break;
+            case START_FAILURE_TASK_PROCESS:
+            case RECOVER_TOLERANCE_FAULT_PROCESS:
+                List<ProcessInstance> failedProcessInstances =
+                        
subWorkflowService.filterFailedProcessInstances(existsSubProcessInstanceList);
+                failedProcessInstances.forEach(processInstance -> {
+                    
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
+                    processInstanceDao.updateProcessInstance(processInstance);
+                });
+                break;
+        }
+    }
+
+    public void generateSubWorkflowInstance(List<Map<String, String>> 
parameterGroup) throws MasterTaskExecuteException {
+        List<ProcessInstance> processInstanceList = new ArrayList<>();
+        ProcessDefinition subProcessDefinition =
+                
processDefineMapper.queryByCode(taskParameters.getProcessDefinitionCode());
+        for (Map<String, String> parameters : parameterGroup) {
+            String dynamicStartParams = JSONUtils.toJsonString(parameters);
+            Command command = 
DynamicCommandUtils.createCommand(processInstance, 
subProcessDefinition.getCode(),
+                    subProcessDefinition.getVersion(), parameters);
+            // todo: set id to -1? we use command to generate sub process 
instance, but the generate method will use the
+            // command id to do
+            // somethings
+            command.setId(-1);
+            DynamicCommandUtils.addDataToCommandParam(command, 
CommandKeyConstants.CMD_DYNAMIC_START_PARAMS,
+                    dynamicStartParams);
+            ProcessInstance subProcessInstance = 
createSubProcessInstance(command);
+            subProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
+            processInstanceDao.insertProcessInstance(subProcessInstance);
+            command.setProcessInstanceId(subProcessInstance.getId());
+            processInstanceList.add(subProcessInstance);
+        }
+
+        List<RelationSubWorkflow> relationSubWorkflowList = new ArrayList<>();
+        for (ProcessInstance subProcessInstance : processInstanceList) {
+            RelationSubWorkflow relationSubWorkflow = new 
RelationSubWorkflow();
+            
relationSubWorkflow.setParentWorkflowInstanceId(Long.valueOf(processInstance.getId()));
+            relationSubWorkflow.setParentTaskCode(taskInstance.getTaskCode());
+            
relationSubWorkflow.setSubWorkflowInstanceId(Long.valueOf(subProcessInstance.getId()));
+            relationSubWorkflowList.add(relationSubWorkflow);
+        }
+
+        log.info("Expected number of runs : {}, actual number of runs : {}", 
parameterGroup.size(),
+                processInstanceList.size());
+
+        int insertN = 
subWorkflowService.batchInsertRelationSubWorkflow(relationSubWorkflowList);
+        log.info("insert {} relation sub workflow", insertN);
+    }
+
+    public ProcessInstance createSubProcessInstance(Command command) throws 
MasterTaskExecuteException {
+        ProcessInstance subProcessInstance;
+        try {
+            subProcessInstance = 
processService.constructProcessInstance(command, processInstance.getHost());
+        } catch (Exception e) {
+            log.error("create sub process instance error", e);
+            throw new MasterTaskExecuteException(e.getMessage());
+        }
+        return subProcessInstance;
+    }
+
+    public List<Map<String, String>> generateParameterGroup() {
+        List<DynamicInputParameter> dynamicInputParameters = 
taskParameters.getListParameters();
+        Set<String> filterStrings =
+                
Arrays.stream(StringUtils.split(taskParameters.getFilterCondition(), 
",")).map(String::trim)
+                        .collect(Collectors.toSet());
+
+        List<List<DynamicInputParameter>> allParameters = new ArrayList<>();
+        for (DynamicInputParameter dynamicInputParameter : 
dynamicInputParameters) {
+            List<DynamicInputParameter> singleParameters = new ArrayList<>();
+            String value = dynamicInputParameter.getValue();
+            String separator = dynamicInputParameter.getSeparator();
+            List<String> valueList =
+                    Arrays.stream(StringUtils.split(value, 
separator)).map(String::trim).collect(Collectors.toList());
+
+            valueList = valueList.stream().filter(v -> 
!filterStrings.contains(v)).collect(Collectors.toList());
+
+            for (String v : valueList) {
+                DynamicInputParameter singleParameter = new 
DynamicInputParameter();
+                singleParameter.setName(dynamicInputParameter.getName());
+                singleParameter.setValue(v);
+                singleParameters.add(singleParameter);
+            }
+            allParameters.add(singleParameters);
+        }
+
+        // use Sets.cartesianProduct to get the cartesian product of all 
parameters
+        List<List<DynamicInputParameter>> cartesianProduct = 
Lists.cartesianProduct(allParameters);
+
+        // convert cartesian product to parameter group List<Map<name:value>>
+        List<Map<String, String>> parameterGroup = 
cartesianProduct.stream().map(
+                inputParameterList -> inputParameterList.stream().collect(
+                        Collectors.toMap(DynamicInputParameter::getName, 
DynamicInputParameter::getValue)))
+                .collect(Collectors.toList());
+
+        log.info("parameter group size: {}", parameterGroup.size());
+        // log every parameter group
+        if (CollectionUtils.isNotEmpty(parameterGroup)) {
+            for (Map<String, String> map : parameterGroup) {
+                log.info("parameter group: {}", map);
+            }
+        }
+        return parameterGroup;
+    }
+
+    @Override
+    public void kill() {
+        try {
+            
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP);
+        } catch (MasterTaskExecuteException e) {
+            log.error("kill " + taskInstance.getName() + " error", e);
+        }
+    }
+
+    private void 
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) 
throws MasterTaskExecuteException {
+        this.haveBeenCanceled = true;
+        List<ProcessInstance> existsSubProcessInstanceList =
+                
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), 
taskInstance.getTaskCode());
+        List<ProcessInstance> runningSubProcessInstanceList =
+                
subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
+        for (ProcessInstance subProcessInstance : 
runningSubProcessInstanceList) {
+            subProcessInstance.setState(stopStatus);
+            processInstanceDao.updateProcessInstance(subProcessInstance);
+            if (subProcessInstance.getState().isFinished()) {
+                log.info("The process instance [{}] is finished, no need to 
stop", subProcessInstance.getId());
+                return;
+            }
+            try {
+                sendToSubProcess(taskExecutionContext, subProcessInstance);
+                log.info("Success send [{}] request to SubWorkflow's master: 
{}", stopStatus,
+                        subProcessInstance.getHost());
+            } catch (RemotingException e) {
+                throw new MasterTaskExecuteException(
+                        String.format("Send stop request to SubWorkflow's 
master: %s failed",
+                                subProcessInstance.getHost()),
+                        e);
+            }
+        }
+    }
+
+    private void sendToSubProcess(TaskExecutionContext taskExecutionContext,
+                                  ProcessInstance subProcessInstance) throws 
RemotingException {
+        WorkflowStateEventChangeRequest stateEventChangeCommand = new 
WorkflowStateEventChangeRequest(
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId(),
+                subProcessInstance.getState(),
+                subProcessInstance.getId(),
+                0);
+        Host host = new Host(subProcessInstance.getHost());
+        masterRpcClient.send(host, stateEventChangeCommand.convert2Command());
+    }
+
+    public boolean isCancel() {
+        return haveBeenCanceled;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java
new file mode 100644
index 0000000000..cbcc50ada6
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
+import 
org.apache.dolphinscheduler.server.master.runner.task.ILogicTaskPluginFactory;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class DynamicLogicTaskPluginFactory implements 
ILogicTaskPluginFactory<DynamicLogicTask> {
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    @Autowired
+    private ProcessDefinitionMapper processDefineMapper;
+
+    @Autowired
+    private CommandMapper commandMapper;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    SubWorkflowService subWorkflowService;
+
+    @Autowired
+    private MasterRpcClient masterRpcClient;
+
+    @Override
+    public DynamicLogicTask createLogicTask(TaskExecutionContext 
taskExecutionContext) {
+        return new DynamicLogicTask(taskExecutionContext, processInstanceDao, 
taskInstanceDao, subWorkflowService,
+                processService,
+                masterRpcClient, processDefineMapper, commandMapper);
+
+    }
+
+    @Override
+    public String getTaskType() {
+        return DynamicLogicTask.TASK_TYPE;
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java
new file mode 100644
index 0000000000..77e29cbac3
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import java.util.Map;
+
+import lombok.Data;
+
+@Data
+public class DynamicOutput {
+
+    private Map<String, String> dynParams;
+
+    private Map<String, String> outputValue;
+
+    private int mappedTimes;
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
index 366fe53cec..7b3008b581 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
@@ -123,8 +123,8 @@ public class SubWorkflowLogicTask extends 
BaseAsyncLogicTask<SubProcessParameter
             log.info("TaskInstance is null");
             return;
         }
-        if (taskInstance.getState().isFinished()) {
-            log.info("The task instance is finished, no need to pause");
+        if (subProcessInstance.getState().isFinished()) {
+            log.info("The subProcessInstance is finished, no need to pause");
             return;
         }
         
subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready 
stop by kill task");
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java
index d717e69362..eb1fb27f37 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
 import 
org.apache.dolphinscheduler.server.master.runner.task.blocking.BlockingLogicTask;
 import 
org.apache.dolphinscheduler.server.master.runner.task.condition.ConditionLogicTask;
 import 
org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask;
+import 
org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask;
 import 
org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask;
 import 
org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask;
 
@@ -38,7 +39,8 @@ public class TaskUtils {
             ConditionLogicTask.TASK_TYPE,
             DependentLogicTask.TASK_TYPE,
             SubWorkflowLogicTask.TASK_TYPE,
-            SwitchLogicTask.TASK_TYPE);
+            SwitchLogicTask.TASK_TYPE,
+            DynamicLogicTask.TASK_TYPE);
 
     public boolean isMasterTask(String taskType) {
         return MASTER_TASK_TYPES.contains(taskType);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a7e01101a7..3d2757d76a 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -64,6 +64,9 @@ public interface ProcessService {
     ProcessInstance handleCommand(String host,
                                   Command command) throws CronParseException, 
CodeGenerateUtils.CodeGenerateException;
 
+    ProcessInstance constructProcessInstance(Command command,
+                                             String host) throws 
CronParseException, CodeGenerateUtils.CodeGenerateException;
+
     Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
 
     ProcessInstance findProcessInstanceById(int processId);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index d0f56f329f..a2064ae2a8 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -744,8 +744,8 @@ public class ProcessServiceImpl implements ProcessService {
      * @param host    host
      * @return process instance
      */
-    protected @Nullable ProcessInstance constructProcessInstance(Command 
command,
-                                                                 String host) 
throws CronParseException, CodeGenerateException {
+    public @Nullable ProcessInstance constructProcessInstance(Command command,
+                                                              String host) 
throws CronParseException, CodeGenerateException {
         ProcessInstance processInstance;
         ProcessDefinition processDefinition;
         CommandType commandType = command.getCommandType();
@@ -765,6 +765,7 @@ public class ProcessServiceImpl implements ProcessService {
             processInstance = generateNewProcessInstance(processDefinition, 
command, cmdParam);
         } else {
             processInstance = 
this.findProcessInstanceDetailById(processInstanceId).orElse(null);
+            setGlobalParamIfCommanded(processDefinition, cmdParam);
             if (processInstance == null) {
                 return null;
             }
@@ -816,6 +817,7 @@ public class ProcessServiceImpl implements ProcessService {
         int runTime = processInstance.getRunTimes();
         switch (commandType) {
             case START_PROCESS:
+            case DYNAMIC_GENERATION:
                 break;
             case START_FAILURE_TASK_PROCESS:
                 // find failed tasks and init these tasks
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
new file mode 100644
index 0000000000..4c2bd9978d
--- /dev/null
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
@@ -0,0 +1,29 @@
+package org.apache.dolphinscheduler.service.subworkflow;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.util.List;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public interface SubWorkflowService {
+
+    List<ProcessInstance> getAllDynamicSubWorkflow(long processInstanceId, 
long taskCode);
+
+    int batchInsertRelationSubWorkflow(List<RelationSubWorkflow> 
relationSubWorkflowList);
+
+    List<ProcessInstance> filterFinishProcessInstances(List<ProcessInstance> 
processInstanceList);
+
+    List<ProcessInstance> filterSuccessProcessInstances(List<ProcessInstance> 
processInstanceList);
+
+    List<ProcessInstance> filterRunningProcessInstances(List<ProcessInstance> 
processInstanceList);
+
+    List<ProcessInstance> 
filterWaitToRunProcessInstances(List<ProcessInstance> processInstanceList);
+
+    List<ProcessInstance> filterFailedProcessInstances(List<ProcessInstance> 
processInstanceList);
+
+    List<Property> getWorkflowOutputParameters(ProcessInstance 
processInstance);
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
new file mode 100644
index 0000000000..82b9342a5e
--- /dev/null
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
@@ -0,0 +1,102 @@
+package org.apache.dolphinscheduler.service.subworkflow;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SubWorkflowServiceImpl implements SubWorkflowService {
+
+    @Autowired
+    private RelationSubWorkflowMapper relationSubWorkflowMapper;
+
+    @Autowired
+    private ProcessInstanceDao processInstanceDao;
+
+    @Autowired
+    private ProcessDefinitionLogMapper processDefinitionLogMapper;
+
+    @Override
+    public List<ProcessInstance> getAllDynamicSubWorkflow(long 
processInstanceId, long taskCode) {
+        List<RelationSubWorkflow> relationSubWorkflows =
+                
relationSubWorkflowMapper.selectAllSubProcessInstance(processInstanceId, 
taskCode);
+        List<Long> allSubProcessInstanceId = relationSubWorkflows.stream()
+                
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList());
+
+        List<ProcessInstance> allSubProcessInstance = 
processInstanceDao.selectBatchIds(allSubProcessInstanceId);
+        
allSubProcessInstance.sort(Comparator.comparing(ProcessInstance::getId));
+        return allSubProcessInstance;
+    }
+
+    @Override
+    public int batchInsertRelationSubWorkflow(List<RelationSubWorkflow> 
relationSubWorkflowList) {
+        int insertN = 
relationSubWorkflowMapper.batchInsert(relationSubWorkflowList);
+        return insertN;
+    }
+
+    @Override
+    public List<ProcessInstance> 
filterFinishProcessInstances(List<ProcessInstance> processInstanceList) {
+        return processInstanceList.stream()
+                .filter(subProcessInstance -> 
subProcessInstance.getState().isFinished()).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<ProcessInstance> 
filterSuccessProcessInstances(List<ProcessInstance> processInstanceList) {
+        return processInstanceList.stream()
+                .filter(subProcessInstance -> 
subProcessInstance.getState().isSuccess()).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<ProcessInstance> 
filterRunningProcessInstances(List<ProcessInstance> processInstanceList) {
+        return processInstanceList.stream()
+                .filter(subProcessInstance -> 
subProcessInstance.getState().isRunning()).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<ProcessInstance> 
filterWaitToRunProcessInstances(List<ProcessInstance> processInstanceList) {
+        return processInstanceList.stream()
+                .filter(subProcessInstance -> 
subProcessInstance.getState().equals(WorkflowExecutionStatus.WAIT_TO_RUN))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<ProcessInstance> 
filterFailedProcessInstances(List<ProcessInstance> processInstanceList) {
+        return processInstanceList.stream()
+                .filter(subProcessInstance -> 
subProcessInstance.getState().isFailure()).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Property> getWorkflowOutputParameters(ProcessInstance 
processInstance) {
+        List<Property> outputParamList =
+                new ArrayList<>(JSONUtils.toList(processInstance.getVarPool(), 
Property.class));
+
+        ProcessDefinitionLog processDefinition = processDefinitionLogMapper
+                
.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
+                        processInstance.getProcessDefinitionVersion());
+        List<Property> globalParamList = 
JSONUtils.toList(processDefinition.getGlobalParams(), Property.class);
+
+        Set<String> ouputParamSet = 
outputParamList.stream().map(Property::getProp).collect(Collectors.toSet());
+
+        // add output global parameters which are not in output parameters list
+        globalParamList.stream().filter(globalParam -> 
!ouputParamSet.contains(globalParam.getProp()))
+                .forEach(outputParamList::add);
+
+        return outputParamList;
+
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index bb7c8fc439..44ced03e27 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -400,6 +400,8 @@ public class TaskConstants {
 
     public static final String TASK_TYPE_SUB_PROCESS = "SUB_PROCESS";
 
+    public static final String TASK_TYPE_DYNAMIC = "DYNAMIC";
+
     public static final String TASK_TYPE_DEPENDENT = "DEPENDENT";
 
     public static final String TASK_TYPE_SQL = "SQL";
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
index d16121f150..dff762300e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
@@ -22,6 +22,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@@ -102,6 +103,8 @@ public class TaskPluginManager {
                 return JSONUtils.parseObject(parametersNode.getTaskParams(), 
DependentParameters.class);
             case TaskConstants.TASK_TYPE_BLOCKING:
                 return JSONUtils.parseObject(parametersNode.getTaskParams(), 
BlockingParameters.class);
+            case TaskConstants.TASK_TYPE_DYNAMIC:
+                return JSONUtils.parseObject(parametersNode.getTaskParams(), 
DynamicParameters.class);
             default:
                 TaskChannel taskChannel = 
this.getTaskChannelMap().get(taskType);
                 if (Objects.isNull(taskChannel)) {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java
new file mode 100644
index 0000000000..7582a23656
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java
@@ -0,0 +1,16 @@
+package org.apache.dolphinscheduler.plugin.task.api.model;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+@Data
+@NoArgsConstructor
+public class DynamicInputParameter {
+
+    @NonNull
+    private String name;
+    @NonNull
+    private String value;
+    private String separator = ",";
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java
new file mode 100644
index 0000000000..e918db3b18
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.parameters;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
+
+import java.util.List;
+
+import lombok.Data;
+
+@Data
+public class DynamicParameters extends AbstractParameters {
+
+    /**
+     * process definition id
+     */
+    private long processDefinitionCode;
+
+    private int maxNumOfSubWorkflowInstances;
+
+    private int degreeOfParallelism;
+
+    private String filterCondition;
+
+    private List<DynamicInputParameter> listParameters;
+
+    @Override
+    public boolean checkParameters() {
+        try {
+            if (listParameters == null || listParameters.isEmpty()) {
+                return false;
+            }
+        } catch (Exception e) {
+            return false;
+        }
+        return this.processDefinitionCode != 0;
+    }
+}
diff --git a/dolphinscheduler-ui/public/images/task-icons/dynamic.png 
b/dolphinscheduler-ui/public/images/task-icons/dynamic.png
new file mode 100644
index 0000000000..6df7485872
Binary files /dev/null and 
b/dolphinscheduler-ui/public/images/task-icons/dynamic.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png 
b/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png
new file mode 100644
index 0000000000..b8b43135cc
Binary files /dev/null and 
b/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png differ
diff --git a/dolphinscheduler-ui/src/common/common.ts 
b/dolphinscheduler-ui/src/common/common.ts
index aa98186c5d..1e2892064e 100644
--- a/dolphinscheduler-ui/src/common/common.ts
+++ b/dolphinscheduler-ui/src/common/common.ts
@@ -30,7 +30,8 @@ import {
   StopOutlined,
   IssuesCloseOutlined,
   SendOutlined,
-  HistoryOutlined
+  HistoryOutlined,
+  HourglassOutlined
 } from '@vicons/antd'
 import { format, parseISO } from 'date-fns'
 import _ from 'lodash'
@@ -123,6 +124,10 @@ export const runningType = (t: any) => [
   {
     desc: `${t('project.workflow.execute_task')}`,
     code: 'EXECUTE_TASK'
+  },
+  {
+    desc: `${t('project.workflow.dynamic_generation')}`,
+    code: 'DYNAMIC_GENERATION'
   }
 ]
 
@@ -378,7 +383,15 @@ export const workflowExecutionState = (t: any): 
IWorkflowExecutionStateConfig =>
     icon: HistoryOutlined,
     isSpin: false,
     classNames: 'pending'
-  }
+  },
+  WAIT_TO_RUN: {
+    id: 18,
+    desc: `${t('project.overview.wait_to_run')}`,
+    color: '#5102ce',
+    icon: HourglassOutlined,
+    isSpin: false,
+    classNames: 'wait_to_run'
+  },
 })
 
 /**
diff --git a/dolphinscheduler-ui/src/common/types.ts 
b/dolphinscheduler-ui/src/common/types.ts
index a07a13c005..ec40e638b5 100644
--- a/dolphinscheduler-ui/src/common/types.ts
+++ b/dolphinscheduler-ui/src/common/types.ts
@@ -41,6 +41,7 @@ export type IWorkflowExecutionState =
   | 'SERIAL_WAIT'
   | 'READY_BLOCK'
   | 'BLOCK'
+  | 'WAIT_TO_RUN'
 
 export type ITaskStateConfig = {
   [key in ITaskState]: {
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts 
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index f6e072960e..dff1a0c18b 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -168,6 +168,7 @@ export default {
     recovery_waiting_thread: 'Recovery waiting thread',
     recover_serial_wait: 'Recover serial wait',
     execute_task: 'Execute the specified task',
+    dynamic_generation: 'Dynamic Generation',
     recovery_suspend: 'Recovery Suspend',
     recovery_failed: 'Recovery Failed',
     gantt: 'Gantt',
@@ -832,7 +833,16 @@ export default {
     dependent_failure_policy_waiting: 'waiting',
     dependent_failure_waiting_time: 'Dependent failure waiting time',
     dependent_failure_waiting_time_tips:
-      'Failure waiting time must be a positive integer'
+      'Failure waiting time must be a positive integer',
+    max_num_of_sub_workflow_instances: '动态生成实例上线',
+    filter_condition: 'Filter Condition',
+    params_value: 'Params Value',
+    separator: 'Separator',
+    dynamic_name_tips: 'name(required)',
+    dynamic_value_tips: 'params or value(required)',
+    dynamic_separator_tips: 'separator(required)',
+    child_node_definition: 'child node definition',
+    child_node_instance: 'child node instance',
   },
   menu: {
     fav: 'Favorites',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts 
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index ccb68b80aa..e469d0fce8 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -169,6 +169,7 @@ export default {
     recovery_waiting_thread: '恢复等待线程',
     recover_serial_wait: '串行恢复',
     execute_task: '执行指定任务',
+    dynamic_generation: '动态生成',
     recovery_suspend: '恢复运行',
     recovery_failed: '重跑失败任务',
     gantt: '甘特图',
@@ -808,7 +809,16 @@ export default {
     dependent_failure_policy_failure: '失败',
     dependent_failure_policy_waiting: '等待',
     dependent_failure_waiting_time: '依赖失败等待时间',
-    dependent_failure_waiting_time_tips: '失败等待时间必须为正整数'
+    dependent_failure_waiting_time_tips: '失败等待时间必须为正整数',
+    max_num_of_sub_workflow_instances: '动态生成实例上线',
+    filter_condition: '过滤条件',
+    params_value: '取值参数',
+    separator: '分隔符',
+    dynamic_name_tips: 'name(必填)',
+    dynamic_value_tips: 'params or value(必填)',
+    dynamic_separator_tips: '分隔符(必填)',
+    child_node_definition: '子节点定义',
+    child_node_instance: '子节点实例',
   },
   menu: {
     fav: '收藏组件',
diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts 
b/dolphinscheduler-ui/src/store/project/task-type.ts
index 993cf5a1f0..4583335a1a 100644
--- a/dolphinscheduler-ui/src/store/project/task-type.ts
+++ b/dolphinscheduler-ui/src/store/project/task-type.ts
@@ -32,6 +32,9 @@ export const TASK_TYPES_MAP = {
   SUB_PROCESS: {
     alias: 'SUB_PROCESS'
   },
+  DYNAMIC: {
+    alias: 'DYNAMIC'
+  },
   PROCEDURE: {
     alias: 'PROCEDURE'
   },
diff --git a/dolphinscheduler-ui/src/store/project/types.ts 
b/dolphinscheduler-ui/src/store/project/types.ts
index 7c2136f08c..e486bd9c5c 100644
--- a/dolphinscheduler-ui/src/store/project/types.ts
+++ b/dolphinscheduler-ui/src/store/project/types.ts
@@ -23,6 +23,7 @@ type TaskExecuteType = 'STREAM' | 'BATCH'
 type TaskType =
   | 'SHELL'
   | 'SUB_PROCESS'
+  | 'DYNAMIC'
   | 'PROCEDURE'
   | 'SQL'
   | 'SPARK'
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx 
b/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
index a76515d134..390968b2c5 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
@@ -158,7 +158,7 @@ const NodeDetailModal = defineComponent({
         },
         {
           text: t('project.node.enter_this_child_node'),
-          show: props.data.taskType === 'SUB_PROCESS',
+          show: props.data.taskType === 'SUB_PROCESS' || props.data.taskType 
=== 'DYNAMIC',
           disabled:
             !props.data.id ||
             (router.currentRoute.value.name === 'workflow-instance-detail' &&
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 0015118fa2..0245e55dcf 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -88,3 +88,4 @@ export { useKubeflow } from './use-kubeflow'
 export { useLinkis } from './use-linkis'
 export { useDataFactory } from './use-data-factory'
 export { useRemoteShell } from './use-remote-shell'
+export { useDynamic } from './use-dynamic'
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
new file mode 100644
index 0000000000..08e47c2d98
--- /dev/null
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import type { IJsonItem } from '../types'
+import { useI18n } from 'vue-i18n'
+
+export function useDynamic(model: { [field: string]: any }): IJsonItem[] {
+  const { t } = useI18n()
+
+  return [
+    {
+      type: 'input-number',
+      field: 'maxNumOfSubWorkflowInstances',
+      span: 12,
+      name: t('project.node.max_num_of_sub_workflow_instances'),
+      validate: {
+        required: true
+      }
+    },
+    {
+      type: 'input-number',
+      field: 'degreeOfParallelism',
+      span: 12,
+      name: t('project.node.parallelism'),
+      validate: {
+        required: true
+      }
+    },
+    {
+      type: 'custom-parameters',
+      field: 'listParameters',
+      name: t('project.node.params_value'),
+      span: 24,
+      children: [
+        {
+          type: 'input',
+          field: 'name',
+          span: 8,
+          props: {
+            placeholder: t('project.node.dynamic_name_tips'),
+            maxLength: 256
+          },
+          validate: {
+            trigger: ['input', 'blur'],
+            required: true,
+            validator(validate: any, value: string) {
+              if (!value) {
+                return new Error(t('project.node.dynamic_name_tips'))
+              }
+
+              const sameItems = model['listParameters'].filter(
+                (item: { name: string }) => item.name === value
+              )
+
+              if (sameItems.length > 1) {
+                return new Error(t('project.node.prop_repeat'))
+              }
+            }
+          }
+        },
+        {
+          type: 'input',
+          field: 'value',
+          span: 8,
+          props: {
+            placeholder: t('project.node.dynamic_value_tips'),
+            maxLength: 256
+          },
+          validate: {
+            trigger: ['input', 'blur'],
+            required: true,
+            validator(validate: any, value: string) {
+              if (!value) {
+                return new Error(t('project.node.dynamic_value_tips'))
+              }
+            }
+          }
+        },
+        {
+          type: 'input',
+          field: 'separator',
+          span: 4,
+          props: {
+            placeholder: t('project.node.dynamic_separator_tips'),
+            maxLength: 256
+          },
+          validate: {
+            trigger: ['input', 'blur'],
+            required: true,
+            validator(validate: any, value: string) {
+              if (!value) {
+                return new Error(t('project.node.dynamic_separator_tips'))
+              }
+            }
+          }
+        }
+      ]
+    },
+    {
+      type: 'input',
+      field: 'filterCondition',
+      span: 24,
+      name: t('project.node.filter_condition')
+    }
+  ]
+}
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 8ee6140b4c..efc01d975e 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -32,7 +32,7 @@ export function formatParams(data: INodeData): {
   taskDefinitionJsonObj: object
 } {
   const taskParams: ITaskParams = {}
-  if (data.taskType === 'SUB_PROCESS') {
+  if (data.taskType === 'SUB_PROCESS' || data.taskType === 'DYNAMIC') {
     taskParams.processDefinitionCode = data.processDefinitionCode
   }
 
@@ -480,6 +480,14 @@ export function formatParams(data: INodeData): {
     taskParams.datasource = data.datasource
   }
 
+  if (data.taskType === 'DYNAMIC') {
+    taskParams.processDefinitionCode = data.processDefinitionCode
+    taskParams.maxNumOfSubWorkflowInstances = data.maxNumOfSubWorkflowInstances
+    taskParams.degreeOfParallelism = data.degreeOfParallelism
+    taskParams.filterCondition = data.filterCondition
+    taskParams.listParameters = data.listParameters
+  }
+
   let timeoutNotifyStrategy = ''
   if (data.timeoutNotifyStrategy) {
     if (data.timeoutNotifyStrategy.length === 1) {
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 0372421357..2d489feced 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -52,10 +52,13 @@ import { useKubeflow } from './use-kubeflow'
 import { useLinkis } from './use-linkis'
 import { useDataFactory } from './use-data-factory'
 import { useRemoteShell } from './use-remote-shell'
+// @ts-ignore
+import { useDynamic } from './use-dynamic'
 
 export default {
   SHELL: useShell,
   SUB_PROCESS: useSubProcess,
+  DYNAMIC: useDynamic,
   PYTHON: usePython,
   SPARK: useSpark,
   MR: useMr,
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
new file mode 100644
index 0000000000..b078e9f85d
--- /dev/null
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import { useRouter } from 'vue-router'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData } from '../types'
+import { ITaskData } from '../types'
+
+export function useDynamic({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const router = useRouter()
+  const workflowCode = router.currentRoute.value.params.code
+  const model = reactive({
+    taskType: 'DYNAMIC',
+    name: '',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30,
+    maxNumOfSubWorkflowInstances: 1024,
+    degreeOfParallelism: 1,
+    filterCondition: '',
+    listParameters: [{ name: null, value: null, separator: ',' }]
+  } as INodeData)
+
+  let extra: IJsonItem[] = []
+  if (from === 1) {
+    extra = [
+      Fields.useTaskType(model, readonly),
+      Fields.useProcessName({
+        model,
+        projectCode,
+        isCreate: !data?.id,
+        from,
+        processName: data?.processName
+      })
+    ]
+  }
+  if (model.listParameters?.length) {
+    model.listParameters[0].disabled = true
+  }
+
+  return {
+    json: [
+      Fields.useName(from),
+      ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model 
}),
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !data?.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useTimeoutAlarm(model),
+      Fields.useChildNode({
+        model,
+        projectCode,
+        from,
+        processName: data?.processName,
+        code: from === 1 ? 0 : Number(workflowCode)
+      }),
+      ...Fields.useDynamic(model),
+      Fields.usePreTasks()
+    ] as IJsonItem[],
+    model
+  }
+}
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index e65045b140..661c43776c 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -429,6 +429,10 @@ interface ITaskParams {
   factoryName?: string
   resourceGroupName?: string
   pipelineName?: string
+  maxNumOfSubWorkflowInstances?: number
+  degreeOfParallelism?: number
+  filterCondition?: string
+  listParameters?: Array<any>
 }
 
 interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts 
b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index 03d523cdaf..9d6e26b640 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -17,6 +17,7 @@
 export type TaskType =
   | 'SHELL'
   | 'SUB_PROCESS'
+  | 'DYNAMIC'
   | 'PROCEDURE'
   | 'SQL'
   | 'SPARK'
@@ -65,6 +66,9 @@ export const TASK_TYPES_MAP = {
   SUB_PROCESS: {
     alias: 'SUB_PROCESS'
   },
+  DYNAMIC: {
+    alias: 'DYNAMIC'
+  },
   PROCEDURE: {
     alias: 'PROCEDURE'
   },
diff --git 
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
 
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index b9a52da161..9bcb2fd7e9 100644
--- 
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ 
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -107,6 +107,9 @@ $bgLight: #ffffff;
     &.icon-sub_process {
       background-image: url('/images/task-icons/sub_process.png');
     }
+    &.icon-dynamic {
+      background-image: url('/images/task-icons/dynamic.png');
+    }
     &.icon-data_quality {
       background-image: url('/images/task-icons/data_quality.png');
     }
@@ -220,6 +223,9 @@ $bgLight: #ffffff;
       &.icon-sub_process {
         background-image: url('/images/task-icons/sub_process_hover.png');
       }
+      &.icon-dynamic {
+        background-image: url('/images/task-icons/dynamic_hover.png');
+      }
       &.icon-data_quality {
         background-image: url('/images/task-icons/data_quality_hover.png');
       }

Reply via email to