This is an automated email from the ASF dual-hosted git repository.
gallardot pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f56d51cc28 [DSIP-72][Dynamic Task] Remove dynamic task type (#16842)
f56d51cc28 is described below
commit f56d51cc28ab4511e2688875da9870a46c57ee46
Author: xiangzihao <[email protected]>
AuthorDate: Thu Dec 5 15:09:27 2024 +0800
[DSIP-72][Dynamic Task] Remove dynamic task type (#16842)
* remove dynamic task type
* remove dynamic task type
* remove dynamic task type
* resolve conflicts
* resolve conflicts
* resolve conflicts
* fix comment
* fix ci
---
docs/configs/docsdev.js | 8 -
docs/docs/en/guide/task/dynamic.md | 77 -----
docs/docs/en/guide/upgrade/incompatible.md | 1 +
docs/docs/zh/guide/task/dynamic.md | 76 -----
docs/docs/zh/guide/upgrade/incompatible.md | 5 +-
docs/img/tasks/demo/dynamic_definition.png | Bin 27885 -> 0 bytes
docs/img/tasks/demo/dynamic_running.png | Bin 24718 -> 0 bytes
docs/img/tasks/icons/dynamic.png | Bin 10606 -> 0 bytes
.../apache/dolphinscheduler/api/enums/Status.java | 2 -
.../service/impl/WorkflowInstanceServiceImpl.java | 4 -
.../src/main/resources/task-type-config.yaml | 1 -
.../StopWorkflowInstanceExecuteFunctionTest.java | 12 +-
.../common/enums/WorkflowExecutionStatus.java | 21 +-
.../utils/placeholder/BusinessTimeUtils.java | 1 -
.../dolphinscheduler/dao/entity/ListenerEvent.java | 88 ------
.../3.3.0_schema/mysql/dolphinscheduler_dml.sql | 2 -
.../postgresql/dolphinscheduler_dml.sql | 2 -
.../impl/WorkflowInstanceDaoImplTest.java | 4 +-
.../dynamic/DynamicAsyncTaskExecuteFunction.java | 173 -----------
.../plugin/dynamic/DynamicCommandUtils.java | 86 ------
.../executor/plugin/dynamic/DynamicLogicTask.java | 339 ---------------------
.../dynamic/DynamicLogicTaskPluginFactory.java | 74 -----
.../executor/plugin/dynamic/DynamicOutput.java | 33 --
.../statemachine/IWorkflowStateAction.java | 1 -
.../statemachine/WorkflowWaitToRunStateAction.java | 117 -------
.../service/subworkflow/SubWorkflowService.java | 2 -
.../subworkflow/SubWorkflowServiceImpl.java | 8 -
.../task/api/task/DynamicLogicTaskChannel.java | 30 --
.../api/task/DynamicLogicTaskChannelFactory.java | 38 ---
.../plugin/task/api/utils/TaskTypeUtils.java | 5 -
.../plugin/task/api/TaskPluginManagerTest.java | 2 -
.../public/images/task-icons/dynamic.png | Bin 10606 -> 0 bytes
.../public/images/task-icons/dynamic_hover.png | Bin 11229 -> 0 bytes
dolphinscheduler-ui/src/locales/en_US/project.ts | 3 -
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 3 -
dolphinscheduler-ui/src/store/project/task-type.ts | 3 -
dolphinscheduler-ui/src/store/project/types.ts | 1 -
.../projects/task/components/node/detail-modal.tsx | 4 +-
.../projects/task/components/node/fields/index.ts | 1 -
.../task/components/node/fields/use-dynamic.ts | 120 --------
.../projects/task/components/node/format-data.ts | 10 +-
.../projects/task/components/node/tasks/index.ts | 2 -
.../task/components/node/tasks/use-dynamic.ts | 83 -----
.../src/views/projects/task/constants/task-type.ts | 4 -
.../workflow/components/dag/dag.module.scss | 6 -
45 files changed, 17 insertions(+), 1435 deletions(-)
diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 0fa7adf261..ef599cd118 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -101,10 +101,6 @@ export default {
title: 'SubWorkflow',
link:
'/en-us/docs/dev/user_doc/guide/task/sub-workflow.html',
},
- {
- title: 'Dynamic',
- link:
'/en-us/docs/dev/user_doc/guide/task/dynamic.html',
- },
{
title: 'Dependent',
link:
'/en-us/docs/dev/user_doc/guide/task/dependent.html',
@@ -816,10 +812,6 @@ export default {
title: 'SubWorkflow',
link:
'/zh-cn/docs/dev/user_doc/guide/task/sub-workflow.html',
},
- {
- title: 'Dynamic',
- link:
'/zh-cn/docs/dev/user_doc/guide/task/dynamic.html',
- },
{
title: 'Dependent',
link:
'/zh-cn/docs/dev/user_doc/guide/task/dependent.html',
diff --git a/docs/docs/en/guide/task/dynamic.md
b/docs/docs/en/guide/task/dynamic.md
deleted file mode 100644
index b304f78ee7..0000000000
--- a/docs/docs/en/guide/task/dynamic.md
+++ /dev/null
@@ -1,77 +0,0 @@
-# Dynamic Task
-
-## Overview
-
-Dynamic task can input multiple parameter lists, calculate all parameter
combinations through Cartesian product, and then execute each parameter
combination as a sub-workflow node.
-
-For example, we have a workflow with two input parameters, a, b.
-
-We can use the dynamic node to define this workflow definition as a node, and
then enter the parameter list
-
-- Parameter a: a1, a2
-- Parameter b: b1, b2
-
-Then the dynamic node will calculate four parameter combinations, which are
-- a1, b1
-- a1, b2
-- a2, b1
-- a2, b2
-
-Then execute these four parameter combinations as the startup parameters of
the sub-workflow node, and a total of four sub-workflow nodes are generated.
-
-## Create Task
-
-- Click `Project -> Management-Project -> Name-Workflow Definition`, and click
the "Create Workflow" button to enter the
- DAG editing page.
-- Drag from the toolbar <img src="../../../../img/tasks/icons/dynamic.png"
width="15"/> task node to canvas.
-
-The task definition is shown in the following figure:
-
-
-
-## Task Parameters
-
-[//]: # (TODO: use the commented anchor below once our website template
supports this syntax)
-[//]: # (- Please refer to [DolphinScheduler Task Parameters
Appendix](appendix.md#default-task-parameters) `Default Task
Parameters` section for default parameters.)
-
-- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md)
`Default Task Parameters` section for default parameters.
-
-| **Task Parameters** |
**Description**
|
-|-----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Child Node | Select the workflow definition of the
sub-workflow. You can jump to the workflow definition of the selected
sub-workflow by entering the sub-node in the upper right corner.
|
-| max num of sub workflow instances | The maximum number of sub-workflow
instances dynamically generated. After exceeding this upper limit, the
dynamically generated sub-workflow instances will no longer be executed.
|
-| Parallelism | The parallelism of the sub-workflow
instances dynamically generated, that is, the number of sub-workflow instances
executed at the same time.
|
-| Param Value | The parameter of the sub-workflow
instance dynamically generated, supports multiple parameters, and the
parameters are separated by delimiters.
|
-| Filter Condition | The filter condition of the sub-workflow
instance dynamically generated, supports multiple filter values, and the filter
conditions are separated by commas, such as `2022,2023`, which will filter the
parameter groups containing the values of 2022 and 2023. |
-
-## Task Parameters Output
-
-The output parameters of the dynamic node refer to the output parameters of
the sub-workflow. The output parameters of all sub-workflows will be collected
into a list as the output parameters of the dynamic node.
-
-When the downstream task is referenced, it can be referenced by
`${dynamic.out(TaskName)}`.
-
-The value is a json, as shown below
-
-```Json
-[
- { "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" },
"mappedTimes":1 },
- { "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" },
"mappedTimes":2 },
- { "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" },
"mappedTimes":3 }
-]
-```
-
-- `dynParams` the input parameters of the sub-workflow
-- `outputValue` is the output parameter of the sub-workflow. For example, the
`p` here is a string that splices the output parameters `a` and `b` of the
sub-workflow and outputs them in the form of variables `p`
-- `mappedTimes` is the index of the execution of the sub-workflow, starting
from 1
-
-## Running Status
-
-After the dynamic task is started, all parameter combinations will be
calculated according to the input parameter list, and then a sub-workflow
instance will be created for each parameter combination.
-
-When the dynamic task is running, it will periodically check the statistical
information of all current sub-workflow instances. If the parallelism is
greater than the number of sub-workflow instances running, it will trigger the
start of the appropriate number of sub-workflow instances (the sub-workflow
instances are created first, and then the start is triggered later).
-
-As shown below.
-
-
-
-The dynamic task will run successfully only when all sub-workflow instances
are running successfully.
diff --git a/docs/docs/en/guide/upgrade/incompatible.md
b/docs/docs/en/guide/upgrade/incompatible.md
index 12152c58a1..6ca21b2c0f 100644
--- a/docs/docs/en/guide/upgrade/incompatible.md
+++ b/docs/docs/en/guide/upgrade/incompatible.md
@@ -36,4 +36,5 @@ This document records the incompatible updates between each
version. You need to
* Remove the `registry-disconnect-strategy` in `application.yaml`
([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* Remove `exec-threads` in worker's `application.yaml`, please use
`physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in
master's `application.yaml`, please use `logic-task-config`
([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
* Drop unused column `other_params_json` in `t_ds_worker_group`
([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
+* Remove the `Dynamic` from the `Task Plugin`
([#16482])(https://github.com/apache/dolphinscheduler/pull/16842)
diff --git a/docs/docs/zh/guide/task/dynamic.md
b/docs/docs/zh/guide/task/dynamic.md
deleted file mode 100644
index ee1d20adb1..0000000000
--- a/docs/docs/zh/guide/task/dynamic.md
+++ /dev/null
@@ -1,76 +0,0 @@
-# 动态节点
-
-## 综述
-
-动态节点可以通过输入多个参数列表,通过笛卡尔积计算出多所有的参数组合,然后将每个参数组合作为一个子工作流节点执行。
-
-比如我们有一个工作流,它具有两个输入参数,a, b。
-我们可以通过动态节点,将这个工作流定义当做一个节点,然后输入参数列表
-- 参数a:a1, a2
-- 参数b:b1, b2
-
-那么动态节点会计算出四个参数组合,分别是
-- a1, b1
-- a1, b2
-- a2, b1
-- a2, b2
-
-然后将这四个参数组合作为子工作流节点的启动参数执行,共生成四个子工作流节点。
-
-## 创建任务
-
-- 点击项目管理 -> 项目名称 -> 工作流定义,点击”创建工作流”按钮,进入 DAG 编辑页面:
-
-- 拖动工具栏的 <img src="../../../../img/tasks/icons/dynamic.png" width="15"/>
任务节点到画板中。
-
-任务定义如下图所示:
-
-
-
-## 任务参数
-
-[//]: # (TODO: use the commented anchor below once our website template
supports this syntax)
-[//]: # (-
默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。)
-
-- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
-
-| **任务参数** | **描述**
|
-|----------|------------------------------------------------------------------------------|
-| 子节点 | 是选择子工作流的工作流定义,右上角进入该子节点可以跳转到所选子工作流的工作流定义
|
-| 动态生成实例上限 | 是指动态生成的子工作流实例的上限,超过该上限后,动态生成的子工作流实例将不再执行
|
-| 并行度 | 是指动态生成的子工作流实例的并行度,即同时执行的子工作流实例的数量
|
-| 取值参数 | 是指动态生成的子工作流实例的参数,支持多个参数,参数之间用分隔符分隔
|
-| 过滤条件 | 是指动态生成的子工作流实例的过滤条件,支持多个过滤值,过滤条件之间用逗号分隔, 如 `2022,2023`,
则会过来包含2022和2023的值的参数组 |
-
-## 任务参数输出
-
-动态节点的输出参数,是指子工作流的输出参数,所有子工作流的输出参数都会被收集到一个列表中,作为动态节点的输出参数。
-
-下游任务引用的时候,可以通过 `${dynamic.out(TaskName)}` 的方式引用。
-
-值为一个json,样例如下
-
-```Json
-[
- { "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" },
"mappedTimes":1 },
- { "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" },
"mappedTimes":2 },
- { "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" },
"mappedTimes":3 }
-]
-```
-
-其中
-- `dynParams` 是子工作流的输入参数
-- `outputValue` 是子工作流的输出参数, 如这里的`p`为将子工作流的输出参数`a`和`b`拼接起来的字符串,以变量`p`的形式输出
-- `mappedTimes` 是子工作流的执行的index,从1开始
-
-## 运行状态
-
-Dynamic任务启动后,会根据输入参数列表,计算出所有的参数组合,然后对每一个参数组合,创建一个子工作流实例。
-
-Dynamic运行时,会定期检测当前所有子工作流实例的统计信息,如果并行度大于运行中的子工作流实例的数量,则会触发启动合适数量的工作流实例(工作流实例是先创建,后续再出发启动)。
-
-如下如所示。
-
-
-
-当且进度所有的子工作流实例全部运行成功时,dynamic任务才会运行成功。
diff --git a/docs/docs/zh/guide/upgrade/incompatible.md
b/docs/docs/zh/guide/upgrade/incompatible.md
index 530e006c9c..927d8df78a 100644
--- a/docs/docs/zh/guide/upgrade/incompatible.md
+++ b/docs/docs/zh/guide/upgrade/incompatible.md
@@ -32,6 +32,7 @@
* 废弃从 1.x 至 2.x 的升级代码
([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* 移除 `数据质量` 模块
([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* 在`application.yaml`中移除`registry-disconnect-strategy`配置
([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
-*
在worker的`application.yaml`中移除`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代
([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
-* 在`t_ds_worker_group` 表中移除 无用的`other_params_json`字段
([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
+* 在 `worker` 的 `application.yaml` 中移除
`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代
([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
+* 在 `t_ds_worker_group` 表中移除 无用的 `other_params_json` 字段
([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
+* 从 `任务插件` 中移除 `Dynamic` 类型
([#16482])(https://github.com/apache/dolphinscheduler/pull/16842)
diff --git a/docs/img/tasks/demo/dynamic_definition.png
b/docs/img/tasks/demo/dynamic_definition.png
deleted file mode 100644
index e3eef9fa2c..0000000000
Binary files a/docs/img/tasks/demo/dynamic_definition.png and /dev/null differ
diff --git a/docs/img/tasks/demo/dynamic_running.png
b/docs/img/tasks/demo/dynamic_running.png
deleted file mode 100644
index cb7d8bcfaa..0000000000
Binary files a/docs/img/tasks/demo/dynamic_running.png and /dev/null differ
diff --git a/docs/img/tasks/icons/dynamic.png b/docs/img/tasks/icons/dynamic.png
deleted file mode 100644
index 6df7485872..0000000000
Binary files a/docs/img/tasks/icons/dynamic.png and /dev/null differ
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 1aacebb355..c9f0912723 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -274,8 +274,6 @@ public enum Status {
NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"),
STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair",
"状态码前后不一致或状态码和code不匹配"),
- TASK_INSTANCE_NOT_DYNAMIC_TASK(10213, "task instance {0} is not dynamic",
"任务实例[{0}]不是Dynamic类型"),
-
CREATE_PROJECT_PARAMETER_ERROR(10214, "create project parameter error",
"创建项目参数错误"),
UPDATE_PROJECT_PARAMETER_ERROR(10215, "update project parameter error",
"更新项目参数错误"),
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java
index 85753558fa..28c20b93ce 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java
@@ -484,10 +484,6 @@ public class WorkflowInstanceServiceImpl extends
BaseServiceImpl implements Work
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS,
taskId);
}
- if (!TaskTypeUtils.isDynamicTask(taskInstance.getTaskType())) {
- putMsg(result, Status.TASK_INSTANCE_NOT_DYNAMIC_TASK,
taskInstance.getName());
- throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS,
taskId);
- }
List<RelationSubWorkflow> relationSubWorkflows =
relationSubWorkflowMapper
.queryAllSubWorkflowInstance((long)
taskInstance.getWorkflowInstanceId(),
taskInstance.getTaskCode());
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
index 56053b74bd..81e964903f 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
@@ -41,7 +41,6 @@ task:
- 'DEPENDENT'
- 'CONDITIONS'
- 'SWITCH'
- - 'DYNAMIC'
dataIntegration:
- 'SEATUNNEL'
- 'DATAX'
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java
index 74ba73e2f0..9c6fa6ac5e 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java
@@ -50,8 +50,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
"RUNNING_EXECUTION",
"READY_PAUSE",
"READY_STOP",
- "SERIAL_WAIT",
- "WAIT_TO_RUN"})
+ "SERIAL_WAIT"})
void exceptionIfWorkflowInstanceCannotStop_canStop(WorkflowExecutionStatus
workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1");
@@ -65,8 +64,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
"RUNNING_EXECUTION",
"READY_PAUSE",
"READY_STOP",
- "SERIAL_WAIT",
- "WAIT_TO_RUN"}, mode = EnumSource.Mode.EXCLUDE)
+ "SERIAL_WAIT"}, mode = EnumSource.Mode.EXCLUDE)
void
exceptionIfWorkflowInstanceCannotStop_canNotStop(WorkflowExecutionStatus
workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1");
@@ -81,8 +79,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
@ParameterizedTest
@EnumSource(value = WorkflowExecutionStatus.class, names = {
- "SERIAL_WAIT",
- "WAIT_TO_RUN"})
+ "SERIAL_WAIT"})
void
ifWorkflowInstanceCanDirectStopInDB_canDirectStopInDB(WorkflowExecutionStatus
workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1");
@@ -93,8 +90,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
@ParameterizedTest
@EnumSource(value = WorkflowExecutionStatus.class, names = {
- "SERIAL_WAIT",
- "WAIT_TO_RUN"}, mode = EnumSource.Mode.EXCLUDE)
+ "SERIAL_WAIT"}, mode = EnumSource.Mode.EXCLUDE)
void
ifWorkflowInstanceCanDirectStopInDB_canNotDirectStopInDB(WorkflowExecutionStatus
workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1");
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
index caacc22deb..00e4c87e2a 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
@@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.common.enums;
import java.util.HashMap;
import java.util.Map;
+import lombok.Getter;
import lombok.NonNull;
import com.baomidou.mybatisplus.annotation.EnumValue;
+@Getter
public enum WorkflowExecutionStatus {
SUBMITTED_SUCCESS(0, "submitted"),
@@ -35,7 +37,6 @@ public enum WorkflowExecutionStatus {
FAILURE(6, "failure"),
SUCCESS(7, "success"),
SERIAL_WAIT(14, "serial wait"),
- WAIT_TO_RUN(17, "wait to run"),
FAILOVER(18, "failover");
private static final Map<Integer, WorkflowExecutionStatus> CODE_MAP = new
HashMap<>();
@@ -50,8 +51,7 @@ public enum WorkflowExecutionStatus {
RUNNING_EXECUTION.getCode(),
READY_PAUSE.getCode(),
READY_STOP.getCode(),
- SERIAL_WAIT.getCode(),
- WAIT_TO_RUN.getCode()
+ SERIAL_WAIT.getCode()
};
static {
@@ -80,12 +80,11 @@ public enum WorkflowExecutionStatus {
return this == RUNNING_EXECUTION
|| this == READY_PAUSE
|| this == READY_STOP
- || this == SERIAL_WAIT
- || this == WAIT_TO_RUN;
+ || this == SERIAL_WAIT;
}
public boolean canDirectStopInDB() {
- return this == SERIAL_WAIT || this == WAIT_TO_RUN;
+ return this == SERIAL_WAIT;
}
public boolean canPause() {
@@ -95,7 +94,7 @@ public enum WorkflowExecutionStatus {
}
public boolean canDirectPauseInDB() {
- return this == SERIAL_WAIT || this == WAIT_TO_RUN;
+ return this == SERIAL_WAIT;
}
public boolean isFinished() {
@@ -145,14 +144,6 @@ public enum WorkflowExecutionStatus {
this.desc = desc;
}
- public int getCode() {
- return code;
- }
-
- public String getDesc() {
- return desc;
- }
-
@Override
public String toString() {
return name();
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
index 6d6e5497d4..9d30aa71ca 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
@@ -59,7 +59,6 @@ public class BusinessTimeUtils {
case RECOVER_SUSPENDED_PROCESS:
case START_FAILURE_TASK_PROCESS:
case REPEAT_RUNNING:
- case DYNAMIC_GENERATION:
case SCHEDULER:
default:
businessDate = addDays(new Date(), -1);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ListenerEvent.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ListenerEvent.java
deleted file mode 100644
index 36fe939ce8..0000000000
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ListenerEvent.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.dao.entity;
-
-import org.apache.dolphinscheduler.common.enums.AlertStatus;
-import org.apache.dolphinscheduler.common.enums.ListenerEventType;
-
-import java.util.Date;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
-
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@TableName("t_ds_listener_event")
-public class ListenerEvent {
-
- /**
- * primary key
- */
- @TableId(value = "id", type = IdType.AUTO)
- private Integer id;
-
- /**
- * content
- */
- @TableField(value = "content")
- private String content;
-
- /**
- * sign
- */
- @TableField(value = "sign")
- private String sign;
-
- /**
- * alert_status
- */
- @TableField(value = "event_type")
- private ListenerEventType eventType;
-
- /**
- * post_status
- */
- @TableField("post_status")
- private AlertStatus postStatus;
-
- /**
- * log
- */
- @TableField("log")
- private String log;
- /**
- * create_time
- */
- @TableField("create_time")
- private Date createTime;
-
- /**
- * update_time
- */
- @TableField("update_time")
- private Date updateTime;
-}
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql
index 780325d42f..2f6a4566d8 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql
@@ -19,5 +19,3 @@ UPDATE t_ds_task_definition SET task_type = 'SUB_WORKFLOW'
WHERE task_type = 'SU
UPDATE t_ds_task_definition_log SET task_type = 'SUB_WORKFLOW' WHERE task_type
= 'SUB_PROCESS';
UPDATE t_ds_task_definition SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type =
'SUB_WORKFLOW';
UPDATE t_ds_task_definition_log SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type =
'SUB_WORKFLOW';
-UPDATE t_ds_task_definition SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';
-UPDATE t_ds_task_definition_log SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql
index 780325d42f..2f6a4566d8 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql
@@ -19,5 +19,3 @@ UPDATE t_ds_task_definition SET task_type = 'SUB_WORKFLOW'
WHERE task_type = 'SU
UPDATE t_ds_task_definition_log SET task_type = 'SUB_WORKFLOW' WHERE task_type
= 'SUB_PROCESS';
UPDATE t_ds_task_definition SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type =
'SUB_WORKFLOW';
UPDATE t_ds_task_definition_log SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type =
'SUB_WORKFLOW';
-UPDATE t_ds_task_definition SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';
-UPDATE t_ds_task_definition_log SET task_params = replace(task_params,
'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
index a12c57abea..520a9b907e 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
@@ -64,9 +64,7 @@ class WorkflowInstanceDaoImplTest extends BaseDaoTest {
WorkflowExecutionStatus.READY_STOP));
workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode,
workflowDefinitionVersion,
WorkflowExecutionStatus.SERIAL_WAIT));
-
workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode,
workflowDefinitionVersion,
- WorkflowExecutionStatus.WAIT_TO_RUN));
- assertEquals(5, workflowInstanceDao
+ assertEquals(4, workflowInstanceDao
.queryByWorkflowCodeVersionStatus(workflowDefinitionCode,
workflowDefinitionVersion, status).size());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicAsyncTaskExecuteFunction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicAsyncTaskExecuteFunction.java
deleted file mode 100644
index b8ba05fe7a..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicAsyncTaskExecuteFunction.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
-
-import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_DYNAMIC_START_PARAMS;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class DynamicAsyncTaskExecuteFunction {
-
- private static final Duration TASK_EXECUTE_STATE_CHECK_INTERVAL =
Duration.ofSeconds(10);
-
- private static final String OUTPUT_KEY = "dynamic.out";
-
- private final WorkflowInstance workflowInstance;
-
- private final TaskInstance taskInstance;
-
- private final SubWorkflowService subWorkflowService;
-
- private final CommandMapper commandMapper;
-
- private final int degreeOfParallelism;
-
- private final DynamicLogicTask logicTask;
-
- public DynamicAsyncTaskExecuteFunction(TaskExecutionContext
taskExecutionContext,
- WorkflowInstance workflowInstance,
- TaskInstance taskInstance,
- DynamicLogicTask dynamicLogicTask,
- CommandMapper commandMapper,
- SubWorkflowService
subWorkflowService,
- int degreeOfParallelism) {
- this.workflowInstance = workflowInstance;
- this.taskInstance = taskInstance;
- this.logicTask = dynamicLogicTask;
- this.degreeOfParallelism = degreeOfParallelism;
-
- this.commandMapper = commandMapper;
- this.subWorkflowService = subWorkflowService;
- }
-
- public @NonNull TaskExecutionStatus getAsyncTaskExecutionStatus() {
- List<WorkflowInstance> allSubWorkflowInstance =
getAllSubProcessInstance();
- int totalSubProcessInstanceCount = allSubWorkflowInstance.size();
-
- List<WorkflowInstance> finishedSubWorkflowInstance =
-
subWorkflowService.filterFinishProcessInstances(allSubWorkflowInstance);
-
- if (finishedSubWorkflowInstance.size() ==
totalSubProcessInstanceCount) {
- log.info("all sub process instance finish");
- int successCount =
subWorkflowService.filterSuccessProcessInstances(finishedSubWorkflowInstance).size();
- log.info("success sub process instance count: {}", successCount);
- if (successCount == totalSubProcessInstanceCount) {
- log.info("all sub process instance success");
- setOutputParameters();
- return TaskExecutionStatus.SUCCESS;
- } else {
- int failedCount = totalSubProcessInstanceCount - successCount;
- log.info("failed sub process instance count: {}", failedCount);
- return TaskExecutionStatus.FAILURE;
- }
- }
-
- if (logicTask.isCancel()) {
- return TaskExecutionStatus.FAILURE;
- }
-
- int runningCount =
subWorkflowService.filterRunningProcessInstances(allSubWorkflowInstance).size();
- int startCount = degreeOfParallelism - runningCount;
- if (startCount > 0) {
- log.info("There are {} sub process instances that can be started",
startCount);
- startSubProcessInstances(allSubWorkflowInstance, startCount);
- }
- // query the status of sub workflow instance
- return TaskExecutionStatus.RUNNING_EXECUTION;
- }
-
- private void setOutputParameters() {
- log.info("set varPool");
- List<WorkflowInstance> allSubWorkflowInstance =
getAllSubProcessInstance();
-
- List<DynamicOutput> dynamicOutputs = new ArrayList<>();
- int index = 1;
- for (WorkflowInstance workflowInstance : allSubWorkflowInstance) {
- DynamicOutput dynamicOutput = new DynamicOutput();
- Map<String, String> dynamicParams =
-
JSONUtils.toMap(JSONUtils.toMap(workflowInstance.getCommandParam()).get(CMD_DYNAMIC_START_PARAMS));
- dynamicOutput.setDynParams(dynamicParams);
-
- Map<String, String> outputValueMap = new HashMap<>();
- List<Property> propertyList =
subWorkflowService.getWorkflowOutputParameters(workflowInstance);
- for (Property property : propertyList) {
- outputValueMap.put(property.getProp(), property.getValue());
- }
-
- dynamicOutput.setOutputValue(outputValueMap);
- dynamicOutput.setMappedTimes(index++);
- dynamicOutputs.add(dynamicOutput);
- }
-
- Property property = new Property();
- property.setProp(String.format("%s(%s)", OUTPUT_KEY,
taskInstance.getName()));
- property.setDirect(Direct.OUT);
- property.setType(DataType.VARCHAR);
- property.setValue(JSONUtils.toJsonString(dynamicOutputs));
-
- List<Property> taskPropertyList = new
ArrayList<>(JSONUtils.toList(taskInstance.getVarPool(), Property.class));
- taskPropertyList.add(property);
- //
logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));
-
- log.info("set property: {}", property);
- }
-
- private void startSubProcessInstances(List<WorkflowInstance>
allSubWorkflowInstance, int startCount) {
- List<WorkflowInstance> waitingWorkflowInstances =
-
subWorkflowService.filterWaitToRunProcessInstances(allSubWorkflowInstance);
-
- for (int i = 0; i < Math.min(startCount,
waitingWorkflowInstances.size()); i++) {
- WorkflowInstance subWorkflowInstance =
waitingWorkflowInstances.get(i);
- Map<String, String> parameters =
JSONUtils.toMap(DynamicCommandUtils
-
.getDataFromCommandParam(subWorkflowInstance.getCommandParam(),
CMD_DYNAMIC_START_PARAMS));
- Command command =
DynamicCommandUtils.createCommand(this.workflowInstance,
- subWorkflowInstance.getWorkflowDefinitionCode(),
subWorkflowInstance.getWorkflowDefinitionVersion(),
- parameters);
- command.setWorkflowInstanceId(subWorkflowInstance.getId());
- commandMapper.insert(command);
- log.info("start sub process instance, sub process instance id: {},
command: {}",
- subWorkflowInstance.getId(),
- command);
- }
- }
-
- public List<WorkflowInstance> getAllSubProcessInstance() {
- return
subWorkflowService.getAllDynamicSubWorkflow(workflowInstance.getId(),
taskInstance.getTaskCode());
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicCommandUtils.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicCommandUtils.java
deleted file mode 100644
index 2dc76c9e72..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicCommandUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
-
-import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.TaskDependType;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-
-public class DynamicCommandUtils {
-
- static public Command createCommand(WorkflowInstance workflowInstance,
- Long subProcessDefinitionCode,
- Integer subProcessDefinitionVersion,
- Map<String, String> parameters) {
- Command command = new Command();
- if
(workflowInstance.getCommandType().equals(CommandType.START_PROCESS)) {
- command.setCommandType(CommandType.DYNAMIC_GENERATION);
- } else {
- command.setCommandType(workflowInstance.getCommandType());
- }
- command.setWorkflowDefinitionCode(subProcessDefinitionCode);
- command.setWorkflowDefinitionVersion(subProcessDefinitionVersion);
- command.setTaskDependType(TaskDependType.TASK_POST);
- command.setFailureStrategy(workflowInstance.getFailureStrategy());
- command.setWarningType(workflowInstance.getWarningType());
-
- String globalParams = workflowInstance.getGlobalParams();
- if (StringUtils.isNotEmpty(globalParams)) {
- List<Property> parentParams =
Lists.newArrayList(JSONUtils.toList(globalParams, Property.class));
- for (Property parentParam : parentParams) {
- parameters.put(parentParam.getProp(), parentParam.getValue());
- }
- }
-
- addDataToCommandParam(command,
CommandKeyConstants.CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(parameters));
- command.setExecutorId(workflowInstance.getExecutorId());
- command.setWarningGroupId(workflowInstance.getWarningGroupId());
-
command.setWorkflowInstancePriority(workflowInstance.getWorkflowInstancePriority());
- command.setWorkerGroup(workflowInstance.getWorkerGroup());
- command.setDryRun(workflowInstance.getDryRun());
- command.setTenantCode(workflowInstance.getTenantCode());
- return command;
- }
-
- static public String getDataFromCommandParam(String commandParam, String
key) {
- Map<String, String> cmdParam = JSONUtils.toMap(commandParam);
- return cmdParam.get(key);
- }
-
- static void addDataToCommandParam(Command command, String key, String
data) {
- Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
- if (cmdParam == null) {
- cmdParam = new HashMap<>();
- }
- cmdParam.put(key, data);
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTask.java
deleted file mode 100644
index 40fd7ab69b..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTask.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
-
-import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
-import org.apache.dolphinscheduler.extract.base.client.Clients;
-import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
-import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
-import
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
-import
org.apache.dolphinscheduler.server.master.engine.executor.plugin.AbstractLogicTask;
-import
org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
-import
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import lombok.extern.slf4j.Slf4j;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-
-@Slf4j
-public class DynamicLogicTask extends AbstractLogicTask<DynamicParameters> {
-
- public static final String TASK_TYPE = "DYNAMIC";
- private final WorkflowInstanceDao workflowInstanceDao;
-
- private final SubWorkflowService subWorkflowService;
-
- private final WorkflowDefinitionMapper workflowDefinitionMapper;
-
- private final CommandMapper commandMapper;
-
- private final ProcessService processService;
-
- private WorkflowInstance workflowInstance;
-
- private TaskInstance taskInstance;
-
- private final TaskExecutionContext taskExecutionContext;
-
- private boolean haveBeenCanceled = false;
-
- public DynamicLogicTask(TaskExecutionContext taskExecutionContext,
- WorkflowInstanceDao workflowInstanceDao,
- TaskInstanceDao taskInstanceDao,
- SubWorkflowService subWorkflowService,
- ProcessService processService,
- WorkflowDefinitionMapper workflowDefinitionMapper,
- CommandMapper commandMapper) {
- super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
- this.workflowInstanceDao = workflowInstanceDao;
- this.subWorkflowService = subWorkflowService;
- this.processService = processService;
- this.workflowDefinitionMapper = workflowDefinitionMapper;
- this.commandMapper = commandMapper;
-
- this.workflowInstance =
workflowInstanceDao.queryById(taskExecutionContext.getWorkflowInstanceId());
- this.taskInstance =
taskInstanceDao.queryById(taskExecutionContext.getTaskInstanceId());
- }
-
- // public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws
MasterTaskExecuteException {
- // List<Map<String, String>> parameterGroup = generateParameterGroup();
- //
- // if (parameterGroup.size() >
dynamicParameters.getMaxNumOfSubWorkflowInstances()) {
- // log.warn("the number of sub process instances [{}] exceeds the maximum
limit [{}]", parameterGroup.size(),
- // dynamicParameters.getMaxNumOfSubWorkflowInstances());
- // parameterGroup = parameterGroup.subList(0,
dynamicParameters.getMaxNumOfSubWorkflowInstances());
- // }
- //
- // // if already exists sub process instance, do not generate again
- // List<WorkflowInstance> existsSubWorkflowInstanceList =
- // subWorkflowService.getAllDynamicSubWorkflow(workflowInstance.getId(),
taskInstance.getTaskCode());
- // if (CollectionUtils.isEmpty(existsSubWorkflowInstanceList)) {
- // generateSubWorkflowInstance(parameterGroup);
- // } else {
- // resetProcessInstanceStatus(existsSubWorkflowInstanceList);
- // }
- // return new DynamicAsyncTaskExecuteFunction(taskExecutionContext,
workflowInstance, taskInstance, this,
- // commandMapper,
- // subWorkflowService, dynamicParameters.getDegreeOfParallelism());
- // }
-
- public void resetProcessInstanceStatus(List<WorkflowInstance>
existsSubWorkflowInstanceList) {
- switch (workflowInstance.getCommandType()) {
- case REPEAT_RUNNING:
- existsSubWorkflowInstanceList.forEach(processInstance -> {
-
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
- workflowInstanceDao.updateById(processInstance);
- });
- break;
- case START_FAILURE_TASK_PROCESS:
- case RECOVER_TOLERANCE_FAULT_PROCESS:
- List<WorkflowInstance> failedWorkflowInstances =
-
subWorkflowService.filterFailedProcessInstances(existsSubWorkflowInstanceList);
- failedWorkflowInstances.forEach(processInstance -> {
-
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
- workflowInstanceDao.updateById(processInstance);
- });
- break;
- }
- }
-
- public void generateSubWorkflowInstance(List<Map<String, String>>
parameterGroup) throws MasterTaskExecuteException {
- List<WorkflowInstance> workflowInstanceList = new ArrayList<>();
- WorkflowDefinition subWorkflowDefinition =
-
workflowDefinitionMapper.queryByCode(taskParameters.getWorkflowDefinitionCode());
- for (Map<String, String> parameters : parameterGroup) {
- String dynamicStartParams = JSONUtils.toJsonString(parameters);
- Command command =
DynamicCommandUtils.createCommand(workflowInstance,
subWorkflowDefinition.getCode(),
- subWorkflowDefinition.getVersion(), parameters);
- // todo: set id to -1? we use command to generate sub process
instance, but the generate method will use the
- // command id to do
- // somethings
- command.setId(-1);
- DynamicCommandUtils.addDataToCommandParam(command,
CommandKeyConstants.CMD_DYNAMIC_START_PARAMS,
- dynamicStartParams);
- WorkflowInstance subWorkflowInstance =
createSubProcessInstance(command);
- subWorkflowInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
- workflowInstanceDao.insert(subWorkflowInstance);
- command.setWorkflowInstanceId(subWorkflowInstance.getId());
- workflowInstanceList.add(subWorkflowInstance);
- }
-
- List<RelationSubWorkflow> relationSubWorkflowList = new ArrayList<>();
- for (WorkflowInstance subWorkflowInstance : workflowInstanceList) {
- RelationSubWorkflow relationSubWorkflow = new
RelationSubWorkflow();
-
relationSubWorkflow.setParentWorkflowInstanceId(Long.valueOf(workflowInstance.getId()));
- relationSubWorkflow.setParentTaskCode(taskInstance.getTaskCode());
-
relationSubWorkflow.setSubWorkflowInstanceId(Long.valueOf(subWorkflowInstance.getId()));
- relationSubWorkflowList.add(relationSubWorkflow);
- }
-
- log.info("Expected number of runs : {}, actual number of runs : {}",
parameterGroup.size(),
- workflowInstanceList.size());
-
- int insertN =
subWorkflowService.batchInsertRelationSubWorkflow(relationSubWorkflowList);
- log.info("insert {} relation sub workflow", insertN);
- }
-
- public WorkflowInstance createSubProcessInstance(Command command) throws
MasterTaskExecuteException {
- WorkflowInstance subWorkflowInstance;
- try {
- subWorkflowInstance =
processService.constructWorkflowInstance(command, workflowInstance.getHost());
- subWorkflowInstance.setIsSubWorkflow(Flag.YES);
- subWorkflowInstance.setVarPool(taskExecutionContext.getVarPool());
- } catch (Exception e) {
- log.error("create sub process instance error", e);
- throw new MasterTaskExecuteException(e.getMessage());
- }
- return subWorkflowInstance;
- }
-
- public List<Map<String, String>> generateParameterGroup() {
- List<DynamicInputParameter> dynamicInputParameters =
getDynamicInputParameters();
- Set<String> filterStrings =
-
Arrays.stream(StringUtils.split(taskParameters.getFilterCondition(),
",")).map(String::trim)
- .collect(Collectors.toSet());
-
- List<List<DynamicInputParameter>> allParameters = new ArrayList<>();
- for (DynamicInputParameter dynamicInputParameter :
dynamicInputParameters) {
- List<DynamicInputParameter> singleParameters = new ArrayList<>();
- String value = dynamicInputParameter.getValue();
- String separator = dynamicInputParameter.getSeparator();
- List<String> valueList =
- Arrays.stream(StringUtils.split(value,
separator)).map(String::trim).collect(Collectors.toList());
-
- valueList = valueList.stream().filter(v ->
!filterStrings.contains(v)).collect(Collectors.toList());
-
- for (String v : valueList) {
- DynamicInputParameter singleParameter = new
DynamicInputParameter();
- singleParameter.setName(dynamicInputParameter.getName());
- singleParameter.setValue(v);
- singleParameters.add(singleParameter);
- }
- allParameters.add(singleParameters);
- }
-
- // use Sets.cartesianProduct to get the cartesian product of all
parameters
- List<List<DynamicInputParameter>> cartesianProduct =
Lists.cartesianProduct(allParameters);
-
- // convert cartesian product to parameter group List<Map<name:value>>
- List<Map<String, String>> parameterGroup =
cartesianProduct.stream().map(
- inputParameterList -> inputParameterList.stream().collect(
- Collectors.toMap(DynamicInputParameter::getName,
DynamicInputParameter::getValue)))
- .collect(Collectors.toList());
-
- log.info("parameter group size: {}", parameterGroup.size());
- // log every parameter group
- if (CollectionUtils.isNotEmpty(parameterGroup)) {
- for (Map<String, String> map : parameterGroup) {
- log.info("parameter group: {}", map);
- }
- }
- return parameterGroup;
- }
-
- private List<DynamicInputParameter> getDynamicInputParameters() {
- List<DynamicInputParameter> dynamicInputParameters =
taskParameters.getListParameters();
- if (CollectionUtils.isNotEmpty(dynamicInputParameters)) {
- for (DynamicInputParameter dynamicInputParameter :
dynamicInputParameters) {
- String value = dynamicInputParameter.getValue();
- Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
- value = ParameterUtils.convertParameterPlaceholders(value,
ParameterUtils.convert(paramsMap));
- dynamicInputParameter.setValue(value);
- }
- }
- return dynamicInputParameters;
- }
-
- @Override
- public void start() throws MasterTaskExecuteException {
- // todo:
- }
-
- @Override
- public TaskExecutionStatus getTaskExecutionState() {
- return taskExecutionContext.getCurrentExecutionStatus();
- }
-
- @Override
- public void pause() throws MasterTaskExecuteException {
- // todo: support pause
- }
-
- @Override
- public void kill() {
- try {
- doKillSubWorkflowInstances();
- } catch (MasterTaskExecuteException e) {
- log.error("kill {} error", taskInstance.getName(), e);
- }
- }
-
- @Override
- public ITaskParameterDeserializer<DynamicParameters>
getTaskParameterDeserializer() {
- return taskParamsJson -> JSONUtils.parseObject(taskParamsJson, new
TypeReference<DynamicParameters>() {
- });
- }
-
- private void doKillSubWorkflowInstances() throws
MasterTaskExecuteException {
- List<WorkflowInstance> existsSubWorkflowInstanceList =
-
subWorkflowService.getAllDynamicSubWorkflow(workflowInstance.getId(),
taskInstance.getTaskCode());
- if (CollectionUtils.isEmpty(existsSubWorkflowInstanceList)) {
- return;
- }
-
- commandMapper.deleteByWorkflowInstanceIds(
-
existsSubWorkflowInstanceList.stream().map(WorkflowInstance::getId).collect(Collectors.toList()));
-
- List<WorkflowInstance> runningSubWorkflowInstanceList =
-
subWorkflowService.filterRunningProcessInstances(existsSubWorkflowInstanceList);
- doKillRunningSubWorkflowInstances(runningSubWorkflowInstanceList);
-
- List<WorkflowInstance> waitToRunWorkflowInstances =
-
subWorkflowService.filterWaitToRunProcessInstances(existsSubWorkflowInstanceList);
- doKillWaitToRunSubWorkflowInstances(waitToRunWorkflowInstances);
-
- this.haveBeenCanceled = true;
- }
-
- private void doKillRunningSubWorkflowInstances(List<WorkflowInstance>
runningSubWorkflowInstanceList) throws MasterTaskExecuteException {
- for (WorkflowInstance subWorkflowInstance :
runningSubWorkflowInstanceList) {
- try {
- WorkflowInstanceStopResponse workflowInstanceStopResponse =
Clients
- .withService(IWorkflowControlClient.class)
- .withHost(subWorkflowInstance.getHost())
- .stopWorkflowInstance(new
WorkflowInstanceStopRequest(subWorkflowInstance.getId()));
- if (workflowInstanceStopResponse.isSuccess()) {
- log.info("Stop SubWorkflow: {} successfully",
subWorkflowInstance.getName());
- } else {
- throw new MasterTaskExecuteException(
- "Stop subWorkflow: " +
subWorkflowInstance.getName() + " failed");
- }
- } catch (MasterTaskExecuteException me) {
- throw me;
- } catch (Exception e) {
- throw new MasterTaskExecuteException(
- String.format("Send stop request to SubWorkflow's
master: %s failed",
- subWorkflowInstance.getHost()),
- e);
- }
- }
- }
-
- private void doKillWaitToRunSubWorkflowInstances(List<WorkflowInstance>
waitToRunWorkflowInstances) {
- for (WorkflowInstance subWorkflowInstance :
waitToRunWorkflowInstances) {
- subWorkflowInstance.setState(WorkflowExecutionStatus.STOP);
- workflowInstanceDao.updateById(subWorkflowInstance);
- }
- }
-
- public boolean isCancel() {
- return haveBeenCanceled;
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTaskPluginFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTaskPluginFactory.java
deleted file mode 100644
index 5065e80c7f..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTaskPluginFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
-
-import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.server.master.engine.executor.plugin.ILogicTaskPluginFactory;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
-import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class DynamicLogicTaskPluginFactory implements
ILogicTaskPluginFactory<DynamicLogicTask> {
-
- @Autowired
- private WorkflowInstanceDao workflowInstanceDao;
-
- @Autowired
- private TaskInstanceDao taskInstanceDao;
-
- @Autowired
- private WorkflowDefinitionMapper processDefineMapper;
-
- @Autowired
- private CommandMapper commandMapper;
-
- @Autowired
- private ProcessService processService;
-
- @Autowired
- SubWorkflowService subWorkflowService;
-
- @Override
- public DynamicLogicTask createLogicTask(final ITaskExecutor taskExecutor) {
- final TaskExecutionContext taskExecutionContext =
taskExecutor.getTaskExecutionContext();
- return new DynamicLogicTask(taskExecutionContext,
- workflowInstanceDao,
- taskInstanceDao,
- subWorkflowService,
- processService,
- processDefineMapper,
- commandMapper);
-
- }
-
- @Override
- public String getTaskType() {
- return DynamicLogicTask.TASK_TYPE;
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicOutput.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicOutput.java
deleted file mode 100644
index c727fcb08e..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicOutput.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
-
-import java.util.Map;
-
-import lombok.Data;
-
-@Data
-public class DynamicOutput {
-
- private Map<String, String> dynParams;
-
- private Map<String, String> outputValue;
-
- private int mappedTimes;
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/IWorkflowStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/IWorkflowStateAction.java
index b2f12702d5..5bac4ada66 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/IWorkflowStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/IWorkflowStateAction.java
@@ -43,7 +43,6 @@ import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkf
* @see WorkflowFailedStateAction
* @see WorkflowSuccessStateAction
* @see WorkflowFailoverStateAction
- * @see WorkflowWaitToRunStateAction
*/
public interface IWorkflowStateAction {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowWaitToRunStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowWaitToRunStateAction.java
deleted file mode 100644
index b9af3c2d8c..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowWaitToRunStateAction.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.engine.workflow.statemachine;
-
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPausedLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStoppedLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowSucceedLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
-import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class WorkflowWaitToRunStateAction extends AbstractWorkflowStateAction {
-
- @Override
- public void startEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowStartLifecycleEvent
workflowStartEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowStartEvent);
- }
-
- @Override
- public void topologyLogicalTransitionEventAction(
- final
IWorkflowExecutionRunnable workflowExecutionRunnable,
- final
WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent
workflowTopologyLogicalTransitionWithTaskFinishEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowTopologyLogicalTransitionWithTaskFinishEvent);
- }
-
- @Override
- public void pauseEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowPauseLifecycleEvent
workflowPauseEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowPauseEvent);
- }
-
- @Override
- public void pausedEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowPausedLifecycleEvent
workflowPausedEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowPausedEvent);
- }
-
- @Override
- public void stopEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowStopLifecycleEvent
workflowStopEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowStopEvent);
- }
-
- @Override
- public void stoppedEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowStoppedLifecycleEvent
workflowStoppedEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowStoppedEvent);
- }
-
- @Override
- public void succeedEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowSucceedLifecycleEvent
workflowSucceedEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowSucceedEvent);
- }
-
- @Override
- public void failedEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowFailedLifecycleEvent
workflowFailedEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowFailedEvent);
- }
-
- @Override
- public void finalizeEventAction(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
- final WorkflowFinalizeLifecycleEvent
workflowFinalizeEvent) {
- throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
- logWarningIfCannotDoAction(workflowExecutionRunnable,
workflowFinalizeEvent);
- }
-
- @Override
- public WorkflowExecutionStatus matchState() {
- return WorkflowExecutionStatus.WAIT_TO_RUN;
- }
-
- /**
- * The running state can only finish with success/failure.
- */
- @Override
- protected void emitWorkflowFinishedEventIfApplicable(final
IWorkflowExecutionRunnable workflowExecutionRunnable) {
- log.warn("The workflow: {} is in wait_to_run state, shouldn't emit
workflow finished event",
- workflowExecutionRunnable.getName());
- }
-}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
index b845de41dd..ae50ddaa2a 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
@@ -38,8 +38,6 @@ public interface SubWorkflowService {
List<WorkflowInstance>
filterRunningProcessInstances(List<WorkflowInstance> workflowInstanceList);
- List<WorkflowInstance>
filterWaitToRunProcessInstances(List<WorkflowInstance> workflowInstanceList);
-
List<WorkflowInstance> filterFailedProcessInstances(List<WorkflowInstance>
workflowInstanceList);
List<Property> getWorkflowOutputParameters(WorkflowInstance
workflowInstance);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
index 22d9396d07..cbdc996722 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.subworkflow;
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
@@ -84,13 +83,6 @@ public class SubWorkflowServiceImpl implements
SubWorkflowService {
.filter(subProcessInstance ->
subProcessInstance.getState().isRunning()).collect(Collectors.toList());
}
- @Override
- public List<WorkflowInstance>
filterWaitToRunProcessInstances(List<WorkflowInstance> workflowInstanceList) {
- return workflowInstanceList.stream()
- .filter(subProcessInstance ->
subProcessInstance.getState().equals(WorkflowExecutionStatus.WAIT_TO_RUN))
- .collect(Collectors.toList());
- }
-
@Override
public List<WorkflowInstance>
filterFailedProcessInstances(List<WorkflowInstance> workflowInstanceList) {
return workflowInstanceList.stream()
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannel.java
deleted file mode 100644
index d9f066d773..0000000000
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannel.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api.task;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
-
-public class DynamicLogicTaskChannel extends AbstractLogicTaskChannel {
-
- @Override
- public AbstractParameters parseParameters(String taskParams) {
- return JSONUtils.parseObject(taskParams, DynamicParameters.class);
- }
-}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannelFactory.java
deleted file mode 100644
index 0164e3ba53..0000000000
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannelFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api.task;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
-import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(TaskChannelFactory.class)
-public class DynamicLogicTaskChannelFactory implements TaskChannelFactory {
-
- public static final String NAME = "DYNAMIC";
- @Override
- public String getName() {
- return NAME;
- }
-
- @Override
- public TaskChannel create() {
- return new DynamicLogicTaskChannel();
- }
-}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java
index 26a632c52b..40059d1b1c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java
@@ -23,7 +23,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.ILogicTaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import
org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory;
import
org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory;
-import
org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory;
import
org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory;
import
org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory;
@@ -50,10 +49,6 @@ public class TaskTypeUtils {
return DependentLogicTaskChannelFactory.NAME.equals(taskType);
}
- public boolean isDynamicTask(String taskType) {
- return DynamicLogicTaskChannelFactory.NAME.equals(taskType);
- }
-
public boolean isLogicTask(String taskType) {
checkArgument(StringUtils.isNotEmpty(taskType), "taskType cannot be
empty");
return TaskPluginManager.getTaskChannel(taskType) instanceof
ILogicTaskChannel;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManagerTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManagerTest.java
index a380af488f..cde0579d2e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManagerTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManagerTest.java
@@ -21,7 +21,6 @@ import static com.google.common.truth.Truth.assertThat;
import
org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory;
import
org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory;
-import
org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory;
import
org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory;
import
org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory;
@@ -34,7 +33,6 @@ class TaskPluginManagerTest {
@ValueSource(strings = {
ConditionsLogicTaskChannelFactory.NAME,
DependentLogicTaskChannelFactory.NAME,
- DynamicLogicTaskChannelFactory.NAME,
SubWorkflowLogicTaskChannelFactory.NAME,
SwitchLogicTaskChannelFactory.NAME})
void testGetTaskChannel_logicTaskChannel(String type) {
diff --git a/dolphinscheduler-ui/public/images/task-icons/dynamic.png
b/dolphinscheduler-ui/public/images/task-icons/dynamic.png
deleted file mode 100644
index 6df7485872..0000000000
Binary files a/dolphinscheduler-ui/public/images/task-icons/dynamic.png and
/dev/null differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png
b/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png
deleted file mode 100644
index b8b43135cc..0000000000
Binary files a/dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png
and /dev/null differ
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 910b364c01..7e0f0a1314 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -894,9 +894,6 @@ export default {
filter_condition: 'Filter Condition',
params_value: 'Params Value',
separator: 'Separator',
- dynamic_name_tips: 'name(required)',
- dynamic_value_tips: 'params or value(required)',
- dynamic_separator_tips: 'separator(required)',
child_node_definition: 'child node definition',
child_node_instance: 'child node instance',
yarn_queue: 'Yarn Queue',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 58d48b796b..28d0fa6017 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -864,9 +864,6 @@ export default {
filter_condition: '过滤条件',
params_value: '取值参数',
separator: '分隔符',
- dynamic_name_tips: 'name(必填)',
- dynamic_value_tips: 'params or value(必填)',
- dynamic_separator_tips: '分隔符(必填)',
child_node_definition: '子节点定义',
child_node_instance: '子节点实例',
yarn_queue: 'Yarn队列',
diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts
b/dolphinscheduler-ui/src/store/project/task-type.ts
index b655a08149..fee1ab16b6 100644
--- a/dolphinscheduler-ui/src/store/project/task-type.ts
+++ b/dolphinscheduler-ui/src/store/project/task-type.ts
@@ -32,9 +32,6 @@ export const TASK_TYPES_MAP = {
SUB_WORKFLOW: {
alias: 'SUB_WORKFLOW'
},
- DYNAMIC: {
- alias: 'DYNAMIC'
- },
PROCEDURE: {
alias: 'PROCEDURE'
},
diff --git a/dolphinscheduler-ui/src/store/project/types.ts
b/dolphinscheduler-ui/src/store/project/types.ts
index bf2d4df3ce..1c56300ed2 100644
--- a/dolphinscheduler-ui/src/store/project/types.ts
+++ b/dolphinscheduler-ui/src/store/project/types.ts
@@ -23,7 +23,6 @@ type TaskExecuteType = 'STREAM' | 'BATCH'
type TaskType =
| 'SHELL'
| 'SUB_WORKFLOW'
- | 'DYNAMIC'
| 'PROCEDURE'
| 'SQL'
| 'SPARK'
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
b/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
index 66bc7559ae..d8cb54df94 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
@@ -179,9 +179,7 @@ const NodeDetailModal = defineComponent({
},
{
text: t('project.node.enter_this_child_node'),
- show:
- props.data.taskType === 'SUB_WORKFLOW' ||
- props.data.taskType === 'DYNAMIC',
+ show: props.data.taskType === 'SUB_WORKFLOW',
disabled:
!props.data.id ||
(router.currentRoute.value.name === 'workflow-instance-detail' &&
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index cbbe8a7032..45f2c79aa0 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -88,6 +88,5 @@ export { useKubeflow } from './use-kubeflow'
export { useLinkis } from './use-linkis'
export { useDataFactory } from './use-data-factory'
export { useRemoteShell } from './use-remote-shell'
-export { useDynamic } from './use-dynamic'
export { useYarnQueue } from './use-queue'
export { useAliyunServerlessSpark } from './use-aliyun-serverless-spark'
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
deleted file mode 100644
index 08e47c2d98..0000000000
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import type { IJsonItem } from '../types'
-import { useI18n } from 'vue-i18n'
-
-export function useDynamic(model: { [field: string]: any }): IJsonItem[] {
- const { t } = useI18n()
-
- return [
- {
- type: 'input-number',
- field: 'maxNumOfSubWorkflowInstances',
- span: 12,
- name: t('project.node.max_num_of_sub_workflow_instances'),
- validate: {
- required: true
- }
- },
- {
- type: 'input-number',
- field: 'degreeOfParallelism',
- span: 12,
- name: t('project.node.parallelism'),
- validate: {
- required: true
- }
- },
- {
- type: 'custom-parameters',
- field: 'listParameters',
- name: t('project.node.params_value'),
- span: 24,
- children: [
- {
- type: 'input',
- field: 'name',
- span: 8,
- props: {
- placeholder: t('project.node.dynamic_name_tips'),
- maxLength: 256
- },
- validate: {
- trigger: ['input', 'blur'],
- required: true,
- validator(validate: any, value: string) {
- if (!value) {
- return new Error(t('project.node.dynamic_name_tips'))
- }
-
- const sameItems = model['listParameters'].filter(
- (item: { name: string }) => item.name === value
- )
-
- if (sameItems.length > 1) {
- return new Error(t('project.node.prop_repeat'))
- }
- }
- }
- },
- {
- type: 'input',
- field: 'value',
- span: 8,
- props: {
- placeholder: t('project.node.dynamic_value_tips'),
- maxLength: 256
- },
- validate: {
- trigger: ['input', 'blur'],
- required: true,
- validator(validate: any, value: string) {
- if (!value) {
- return new Error(t('project.node.dynamic_value_tips'))
- }
- }
- }
- },
- {
- type: 'input',
- field: 'separator',
- span: 4,
- props: {
- placeholder: t('project.node.dynamic_separator_tips'),
- maxLength: 256
- },
- validate: {
- trigger: ['input', 'blur'],
- required: true,
- validator(validate: any, value: string) {
- if (!value) {
- return new Error(t('project.node.dynamic_separator_tips'))
- }
- }
- }
- }
- ]
- },
- {
- type: 'input',
- field: 'filterCondition',
- span: 24,
- name: t('project.node.filter_condition')
- }
- ]
-}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 57c658dd9a..9e7449c439 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -34,7 +34,7 @@ export function formatParams(data: INodeData): {
} {
const rdbmsSourceTypes = ref(['MYSQL', 'ORACLE', 'SQLSERVER', 'HANA'])
const taskParams: ITaskParams = {}
- if (data.taskType === 'SUB_WORKFLOW' || data.taskType === 'DYNAMIC') {
+ if (data.taskType === 'SUB_WORKFLOW') {
taskParams.workflowDefinitionCode = data.workflowDefinitionCode
}
@@ -454,14 +454,6 @@ export function formatParams(data: INodeData): {
taskParams.datasource = data.datasource
}
- if (data.taskType === 'DYNAMIC') {
- taskParams.workflowDefinitionCode = data.workflowDefinitionCode
- taskParams.maxNumOfSubWorkflowInstances = data.maxNumOfSubWorkflowInstances
- taskParams.degreeOfParallelism = data.degreeOfParallelism
- taskParams.filterCondition = data.filterCondition
- taskParams.listParameters = data.listParameters
- }
-
let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) {
if (data.timeoutNotifyStrategy.length === 1) {
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index e98d709a44..321b22d83e 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -50,13 +50,11 @@ import { useKubeflow } from './use-kubeflow'
import { useLinkis } from './use-linkis'
import { useDataFactory } from './use-data-factory'
import { useRemoteShell } from './use-remote-shell'
-import { useDynamic } from './use-dynamic'
import { useAliyunServerlessSpark } from './use-aliyun-serverless-spark'
export default {
SHELL: useShell,
SUB_WORKFLOW: useSubWorkflow,
- DYNAMIC: useDynamic,
PYTHON: usePython,
SPARK: useSpark,
MR: useMr,
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
deleted file mode 100644
index 8d20d87c9f..0000000000
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { reactive } from 'vue'
-import { useRouter } from 'vue-router'
-import * as Fields from '../fields/index'
-import type { IJsonItem, INodeData } from '../types'
-import { ITaskData } from '../types'
-
-export function useDynamic({
- projectCode,
- from = 0,
- readonly,
- data
-}: {
- projectCode: number
- from?: number
- readonly?: boolean
- data?: ITaskData
-}) {
- const router = useRouter()
- const workflowCode = router.currentRoute.value.params.code
- const model = reactive({
- taskType: 'DYNAMIC',
- name: '',
- flag: 'YES',
- description: '',
- timeoutFlag: false,
- localParams: [],
- environmentCode: null,
- failRetryInterval: 1,
- failRetryTimes: 0,
- workerGroup: 'default',
- delayTime: 0,
- timeout: 30,
- maxNumOfSubWorkflowInstances: 1024,
- degreeOfParallelism: 1,
- filterCondition: '',
- listParameters: [{ name: null, value: null, separator: ',' }]
- } as INodeData)
-
- if (model.listParameters?.length) {
- model.listParameters[0].disabled = true
- }
-
- return {
- json: [
- Fields.useName(from),
- ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model
}),
- Fields.useRunFlag(),
- Fields.useDescription(),
- Fields.useTaskPriority(),
- Fields.useWorkerGroup(projectCode),
- Fields.useEnvironmentName(model, !data?.id),
- ...Fields.useTaskGroup(model, projectCode),
- ...Fields.useTimeoutAlarm(model),
- Fields.useChildNode({
- model,
- projectCode,
- from,
- workflowName: data?.workflowDefinitionName,
- code: from === 1 ? 0 : Number(workflowCode)
- }),
- ...Fields.useDynamic(model),
- Fields.usePreTasks()
- ] as IJsonItem[],
- model
- }
-}
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index aeea696dd8..8dc08a02d3 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -17,7 +17,6 @@
export type TaskType =
| 'SHELL'
| 'SUB_WORKFLOW'
- | 'DYNAMIC'
| 'PROCEDURE'
| 'SQL'
| 'SPARK'
@@ -65,9 +64,6 @@ export const TASK_TYPES_MAP = {
SUB_WORKFLOW: {
alias: 'SUB_WORKFLOW'
},
- DYNAMIC: {
- alias: 'DYNAMIC'
- },
PROCEDURE: {
alias: 'PROCEDURE'
},
diff --git
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 217059b0ed..7dc3ac0249 100644
---
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -107,9 +107,6 @@ $bgLight: #ffffff;
&.icon-sub_workflow {
background-image: url('/images/task-icons/sub_workflow.png');
}
- &.icon-dynamic {
- background-image: url('/images/task-icons/dynamic.png');
- }
&.icon-procedure {
background-image: url('/images/task-icons/procedure.png');
}
@@ -220,9 +217,6 @@ $bgLight: #ffffff;
&.icon-sub_workflow {
background-image: url('/images/task-icons/sub_workflow_hover.png');
}
- &.icon-dynamic {
- background-image: url('/images/task-icons/dynamic_hover.png');
- }
&.icon-procedure {
background-image: url('/images/task-icons/procedure_hover.png');
}