According to the results of the discussion, the plan has been re-optimized
and related development work:

## 1. Currently
The workflow definition of the current DS system includes task definition
data and task relationship data. In the design of the database, task data
and task relationship data are stored in the workflow as a string type
field (process_definition_json) Definition table (t_ds_process_definition).

With the increase of workflow and tasks, the following problems will arise:

-Task data, relational data and workflow data are coupled together, which
is not friendly to the scenario of single-task scheduling. The task must be
created in the workflow

-The task cannot be reused because the task is created in the workflow

-The maintenance cost is high. If you move the whole body and modify any
task, you need to update the data in the workflow as a whole, and it also
increases the log cost

-When there are many tasks in the workflow, the efficiency of global search
and statistical analysis is low, such as querying which tasks use which
data source

-Poor scalability, for example, the realization of blood relationship
function in the future will only lead to more and more bloated workflow
definitions

-Tasks, relationships, and workflow boundaries are blurred. Condition nodes
and delay nodes are also regarded as a task, which is actually a
combination of relationships and conditions

Based on the above pain points, we need to redefine the business boundaries
of tasks, relationships, and workflows, and redesign their data structures
based on this

## 2. Design Ideas

### 2.1 Workflow, relationship, job

First of all, we set aside the current implementation and clarify the
business boundaries of tasks (the subsequent description is changed to
jobs), relationships, and workflows, and how to decouple

-Job: the task to be executed by the scheduling system, the job only
contains the data and resources needed to execute the job
-Relationship: the relationship between the job and the job and the
execution conditions, including the execution relationship (after A
completes, execute B) and execution conditions (after A completes and
succeeds, execute B; after A completes and fails, execute C; A completes 30
After minutes, execute D)
-Workflow: the carrier of a set of relationships, the workflow only saves
the relationships between jobs (DAG is a form of presentation of workflow,
a way to create relationships)

Combined with the functions supported by the current DS, we can make a
classification

-Job: Dependency check, sub-process, Shell, stored procedure, Sql, Spark,
Flink, MR, Python, Http, DataX, Sqoop
-Relationship: serial execution, parallel execution, aggregate execution,
conditional branch, delayed execution
-Workflow: the boundary of scheduling execution, including a set of
relationships

#### 2.1.1 Further refinement

The job definition data is not much different from the current job
definition data. Both are composed of public fields and custom fields. You
only need to remove the fields related to the relationship.

The workflow definition data is not much different from the current
workflow definition data, just remove the json field.

Relational data, we can abstract into two nodes and one path according to
classification. The node is the job. The path includes the conditional
rules that need to be met from the pre-node to the post-node. The
conditional rules include: unconditional, judgment condition, and delay
condition.

### 2.2 Version Management

We clarify the business boundaries. After decoupling, they become a
reference relationship. The workflow and the relationship are one-to-many,
and the relationship and the job are one-to-many. The definition data also
needs to save the version record, which can support the restoration of
historical data in the future.

So the design idea here is:

To define data, you need to add a version field

The definition table needs to add the corresponding log table

When creating definition data, double write to the definition table and log
table. When modifying the definition data, save the modified version to the
log table

There is no need to save version information in the reference data of the
definition table (quote the latest version)

### 2.3 Example data

The current DB design already has workflow instance tables and task
instance tables, and DS currently supports data changes in instance tables.
The instance table cannot only save the code and version information of the
definition table, but also needs to maintain detailed definition data.
Therefore, it is necessary to split the workflow instance table into a
workflow instance table and a job relationship table, and the task instance
table is generally unchanged. The fields of the three instance tables are
basically the same as those of the definition table.

### 2.4 Business Logo Design

Here is also involved in the import and export of workflow and job
definition data. According to the previous community discussion, business
identification needs to be introduced. Each data in the workflow definition
table and job definition table will have a business identification,
relationship definition data, and dependent jobs Establish a reference
relationship with the sub-workflow job through the business identifier. The
specific realization of the business logo is the voting result of the plan
to be designed.

Related Issues:
https://github.com/apache/incubator-dolphinscheduler/issues/3820

Design plan:

## 3. Design plan

### 3.1 Table model design

#### 3.1.1 Workflow definition table: t_ds_process_definithon

| Column Name | Description |
| ---- | ---- |
| id | Self-incrementing ID |
| code | Code (the original name field) |
| version | Version |
| description | description |
| project_code | Project code |
| release_state | Release state |
| user_id | Owning user ID |
| global_params | Global parameters |
| flag | Whether the process is available: 0 is not available, 1 is
available |
| receivers | recipients |
| receivers_cc | CC |
| timeout | Timeout time |
| tenant_id | tenant ID |
| locations | Node coordinate information |
| create_time | Creation time |
| update_time | Modification time |

#### 3.1.2 Workflow job relationship table: t_ds_process_task_relation

Note: The last node has unconditional data and post data. Here you can
imagine the two ends of a line, the left is the front node, the middle is
the condition, and the right is the post node.

| Column Name | Description |
| ----------------------- | ------------------------- ------------- |
| id | Self-incrementing ID |
| project_code | Project code |
| process_definition_code | Workflow coding |
| pre_project_code | Pre-quoted project code |
| pre_task_code | Pre-reference job code |
| condition_type | Condition type 0: None 1: Judgment 2: Delayed |
| condition_params | Condition parameters (json) |
| post_project_code | Post reference project code |
| post_task_code | Post reference job code |
| create_time | Creation time |
| update_time | Modification time |

#### 3.1.3 Job definition table: t_ds_task_definithon

| Column Name | Description |
| ----------------------- | -------------- |
| id | Self-incrementing ID |
| code | Code (the original name field) |
| version | Version |
| description | description |
| project_code | Project code |
| task_type | Job type |
| task_params | Job custom parameters |
| run_flag | Run flag |
| task_priority | Job priority |
| worker_group | worker group |
| fail_retry_times | Number of failed retries |
| fail_retry_interval | Failure retry interval |
| timeout_flag | Timeout flag |
| timeout_notify_strategy | Timeout notification strategy |
| timeout_duration | Timeout duration |
| create_time | Creation time |
| update_time | Modification time |

#### 3.1.4 Workflow definition log table: t_ds_process_definithon_log

Add operation type (add, modify, delete), operator, and operation time
based on the workflow definition table

#### 3.1.5 Workflow job relationship log table:
t_ds_process_task_relation_log

Add workflow version, operation type (add, modify, delete), operator,
operation time based on the job relationship table

#### 3.1.6 Job definition log table: t_ds_task_definithon_log

Add operation type (add, modify, delete), operator, and operation time
based on the job definition table

### 3.2 Master-Worker scheduling design

When the Master schedules the workflow, it queries the workflow details and
all job relationship data according to the project code and workflow code
(job data is not loaded here), generates a DAG, traverses the DAG job, and
sends the project code and job code to the Worker. Project code, job code
query detailed job data and execute the job

## 4. Related work split

### 4.1 Frontend

Added job management related functions, including: job list, job creation,
update, delete, view details operations

To create a workflow page, you need to pass workflow information, job
relationship information, and job information to the back-end API layer to
save/update

Workflow page, when dragging and dropping task nodes, it also supports
reference project-job (default current search job under current project)
and create job operation

### 4.2 API layer

Added job data related processing interface, including version processing
(query, create, modify, delete, online and offline...)

Refactored workflow data related processing interface, including version
processing (query, create, modify, delete, import, export, online and
offline...)

Refactored the processing interface of workflow instance data (query,
modify, Gantt chart)

Refactoring job instance query interface

Refactored workflow instance, job instance related statistical interface
(UI system homepage, project homepage statistical data, related monitoring
data)

### 3.3 Master

Rebuild Master according to the <3.2 Master-Worker Scheduling Design> scheme

### 3.4 Worker

Refactor Worker according to the <3.2 Master-Worker Scheduling Design>
scheme

------------------------------------------------------------------------------------------------------------------------------------------

根据讨论结果,重新优化了方案,还有相关开发工作

## 1.现状
当前DS系统的工作流定义包含了任务定义数据和任务之间关系数据,并且在数据库的设计上,任务数据和任务关系数据是以一个字符串类型字段(process_definition_json)的方式,保存在工作流定义表(t_ds_process_definition)中。

随着工作流和任务的增加,会产生如下问题:

- 任务数据、关系数据和工作流数据耦合在一起,对单任务调度的场景不友好,任务必须创建在工作流内

- 任务无法复用,因为任务是创建在工作流内的

- 维护成本高,牵一发动全身,修改任何一个任务,都需要整体更新工作流内数据,同时也增加了日志成本

- 工作流内任务较多时,全局搜索和统计分析效率低,例如查询哪些任务用到了哪个数据源

- 扩展性差,例如未来要实现血缘功能,只会导致工作流定义越来越臃肿

- 任务、关系、工作流边界模糊,条件节点、延迟节点也被当做一种任务,实际是关系与条件的组合

基于以上痛点,我们需要重新定义任务、关系、工作流的业务边界,基于此重新设计它们的数据结构

## 2.设计思路

### 2.1 工作流、关系、作业

首先,我们抛开当前的实现,明确任务(后续描述更改为作业)、关系、工作流的业务边界,如何去解耦

- 作业:调度系统要执行的任务,作业内只包含执行作业所需要的数据和资源
-
关系:作业与作业之间的关系以及执行条件,包含执行关系(A完成后,执行B)和执行条件(A完成并成功后,执行B;A完成并失败后,执行C;A完成30分钟后,执行D)
- 工作流:一组关系的载体,工作流只保存作业间的关系(DAG是工作流的一种展示形式,创建关系的一种方式)

结合当前DS支持的功能,我们可以做一个分类

- 作业:依赖检查、子流程、Shell、存储过程、Sql、Spark、Flink、MR、Python、Http、DataX、Sqoop
- 关系:串行执行、并行执行、聚合执行、条件分支、延迟执行
- 工作流:调度执行的边界,包含一组关系

#### 2.1.1 进一步细化

作业定义数据,和当前的作业定义数据差别不大,都是由公共字段和自定义字段组成,只需要去掉关系相关的字段就可以了。

工作流定义数据,和当前的工作流定义数据差别也不大,去掉json字段就可以了。

关系数据,我们根据分类可以抽象为两个节点和一个路径。节点就是作业,路径包含前置节点到后置节点需要满足的条件规则是什么,条件规则包含:无条件、判断条件、延迟条件。

### 2.2 版本管理

我们明确业务边界,解耦后它们之间就变成了引用关系,工作流和关系之间是一对多,关系和作业之间是一对多。定义数据还需要保存版本记录,后续可以支持恢复历史数据。

所以这里的设计思路是:

定义数据需要增加版本字段

定义表需要增加对应的日志表

创建定义数据时,双写到定义表和日志表,修改定义数据时,保存修改后的版本到日志表

定义表的引用数据中不需要保存版本信息(引用最新版本)

### 2.3 实例数据

当前DB设计已经有工作流实例表、任务实例表,而且DS当前支持实例表的数据变更。实例表不能仅保存定义表的编码和版本信息,还需要维护详细的定义数据。所以需要将工作流实例表拆分为工作流实例表和作业关系表,任务实例表总体不变。三张实例表的字段和定义表字段基本一致。

### 2.4 业务标识设计

这里还涉及到工作流、作业定义数据导入导出问题,根据之前社区讨论的方案,需要引入业务标识,工作流定义表和作业定义表每条数据都会有一个业务标识,关系定义数据、依赖作业和子工作流作业内部通过业务标识建立引用关系。业务标识的具体实现待设计方案的投票结果。

相关Issue:https://github.com/apache/incubator-dolphinscheduler/issues/3820

设计方案:

## 3.设计方案

### 3.1 表模型设计

#### 3.1.1 工作流定义表:t_ds_process_definithon

| 列名 | 描述 |
| ---- | ---- |
| id            | 自增ID                         |
| code          | 编码(原name字段)               |
| version       | 版本                           |
| description   | 描述                           |
| project_code  | 项目编码                       |
| release_state | 发布状态                       |
| user_id       | 所属用户ID                     |
| global_params | 全局参数                       |
| flag          | 流程是否可用:0 不可用,1 可用 |
| receivers     | 收件人                         |
| receivers_cc  | 抄送人                         |
| timeout       | 超时时间                       |
| tenant_id     | 租户ID                         |
| locations     | 节点坐标信息                    |
| create_time   | 创建时间                       |
| update_time   | 修改时间                       |

#### 3.1.2 工作流作业关系表:t_ds_process_task_relation

注:最后一个节点无条件数据和后置数据,这里可以想象一条线的两端,左边是前置节点,中间是条件,右边是后置节点

| 列名                    | 描述                                   |
| ----------------------- | -------------------------------------- |
| id                      | 自增ID                                 |
| project_code            | 项目编码                               |
| process_definition_code | 工作流编码                             |
| pre_project_code        | 前置引用项目编码                        |
| pre_task_code           | 前置引用作业编码                        |
| condition_type          | 条件类型 0:无 1:判断 2:延迟            |
| condition_params        | 条件参数(json)                        |
| post_project_code       | 后置引用项目编码                        |
| post_task_code          | 后置引用作业编码                        |
| create_time             | 创建时间                               |
| update_time             | 修改时间                               |

#### 3.1.3 作业定义表:t_ds_task_definithon

| 列名                    | 描述           |
| ----------------------- | -------------- |
| id                      | 自增ID         |
| code                    | 编码(原name字段) |
| version                 | 版本           |
| description             | 描述           |
| project_code            | 项目编码        |
| task_type               | 作业类型       |
| task_params             | 作业自定义参数 |
| run_flag                | 运行标志       |
| task_priority           | 作业优先级     |
| worker_group            | worker分组     |
| fail_retry_times        | 失败重试次数   |
| fail_retry_interval     | 失败重试间隔   |
| timeout_flag            | 超时标志       |
| timeout_notify_strategy | 超时通知策略   |
| timeout_duration        | 超时时长       |
| create_time             | 创建时间       |
| update_time             | 修改时间       |

#### 3.1.4 工作流定义日志表:t_ds_process_definithon_log

工作流定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间

#### 3.1.5 工作流作业关系日志表:t_ds_process_task_relation_log

作业关系表基础上增加工作流版本、操作类型(新增、修改、删除)、操作人、操作时间

#### 3.1.6 作业定义日志表:t_ds_task_definithon_log

作业定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间

### 3.2 Master-Worker调度设计

Master调度工作流时,根据项目编码、工作流编码查询工作流详细信息和所有作业关系数据(这里不加载作业数据),生成DAG,遍历DAG作业,发送项目编码、作业编码到Worker,Worker根据项目编码、作业编码查询作业详细数据并执行作业

## 4.相关工作拆分

### 4.1 前端

增加作业管理相关功能,包括:作业列表,作业的创建、更新、删除、查看详情操作

创建工作流页面,需要将工作流信息、作业关系信息、作业信息传给后端API层保存/更新

工作流页面,拖拽任务节点时,同时支持引用项目-作业(默认当前搜索当前项目下作业)和创建作业操作

### 4.2 API层

增加作业数据相关处理接口,包含版本处理(查询、新建、修改、删除、上下线...)

重构工作流数据相关处理接口,包含版本处理(查询、新建、修改、删除、导入、导出、上下线...)

重构工作流实例数据相关处理接口(查询、修改、甘特图)

重构作业实例查询接口

重构工作流实例、作业实例相关统计接口(UI系统首页、项目首页统计数据、相关监控数据)

### 3.3 Master

根据《3.2 Master-Worker调度设计》方案重构Master

### 3.4 Worker

根据《3.2 Master-Worker调度设计》方案重构Worker

--------------------
DolphinScheduler(Incubator) Commtter
Hemin Wen  温合民
[email protected]
--------------------


Hemin Wen <[email protected]> 于2020年11月25日周三 上午10:01写道:

> Hi!
>
> About json splitting of workflow definition, The following is the design
> plan for splitting three tables.
>
> Everyone can discuss together.
>
>
> --------------------------------------------------------------------------------------------------------------
>
> ## 1. Currently
> The workflow definition of the current DS system includes task definition
> data and task relationship data. In the design of the database, task data
> and task relationship data are stored in the workflow as a string type
> field (process_definition_json) Definition table (t_ds_process_definition).
>
> With the increase of workflow and tasks, the following problems will arise:
>
> -Task data, relational data and workflow data are coupled together, which
> is not friendly to the scenario of single-task scheduling. The task must be
> created in the workflow
>
> -The task cannot be reused because the task is created in the workflow
>
> -The maintenance cost is high. If you move the whole body and modify any
> task, you need to update the data in the workflow as a whole, and it also
> increases the log cost
>
> -When there are many tasks in the workflow, the efficiency of global
> search and statistical analysis is low, such as querying which tasks use
> which data source
>
> -Poor scalability, for example, the realization of blood relationship
> function in the future will only lead to more and more bloated workflow
> definitions
>
> -Tasks, relationships, and workflow boundaries are blurred. Condition
> nodes and delay nodes are also regarded as a task, which is actually a
> combination of relationships and conditions
>
> Based on the above pain points, we need to redefine the business
> boundaries of tasks, relationships, and workflows, and redesign their data
> structures based on this
>
> ## 2. Design Ideas
>
> ### 2.1 Workflow, relation, job
>
> First of all, we set aside the current implementation and clarify the
> business boundaries of tasks (the subsequent description is changed to
> jobs), relationships, and workflows, and how to decouple
>
> -Job: the task that the scheduling system really needs to execute, the job
> only contains the data and resources needed to execute the job
> -relation: the relationship between the job and the job and the execution
> conditions, including the execution relationship (after A completes,
> execute B) and execution conditions (after A completes and succeeds,
> execute B; after A completes and fails, execute C; A completes 30 After
> minutes, execute D)
> -Workflow: the carrier of a set of relationships, the workflow only saves
> the relationships between jobs (DAG is a display form of workflow, a way to
> create relationships)
>
> Combined with the functions supported by the current DS, we can make a
> classification
>
> -Job: Dependency check, sub-process, Shell, stored procedure, Sql, Spark,
> Flink, MR, Python, Http, DataX, Sqoop
> -Relationship: serial execution, parallel execution, aggregate execution,
> conditional branch, delayed execution
> -Workflow: the boundary of scheduling execution, including a set of
> relationships
>
> #### 2.1.1 Further refinement
>
> The job definition data is not much different from the current job
> definition data. Both are composed of public fields and custom fields. You
> only need to remove the fields related to the relationship.
>
> The workflow definition data is not much different from the current
> workflow definition data, just remove the json field.
>
> Relational data, we can abstract into two nodes and one path according to
> classification. The node is the job, and the path includes the conditional
> rules that need to be met from the pre-node to the post-node. The
> conditional rules include: unconditional, judgment condition, and delay
> condition.
>
> ### 2.2 Version Management
>
> We clarify the business boundaries. After decoupling, they become a
> reference relationship. The workflow and the relationship are one-to-many,
> and the relationship and the job are one-to-many. Not only is the
> definition of data, we also need to consider instance data. Every time a
> workflow is scheduled and executed, a workflow instance will be generated.
> Jobs and workflows can be changed, and the workflow instance must support
> viewing, rerun, recovery failure, etc. . This requires the introduction of
> version management of the definition data. Every time workflow,
> relationship, and job changes need to save old version data and generate
> new version data.
>
> So the design idea here is:
>
> To define data, you need to add a version field
>
> The definition table needs to add the corresponding log table
>
> When creating definition data, double write to the definition table and
> log table. When modifying the definition data, save the modified version to
> the log table
>
> There is no need to save version information in the reference data of the
> definition table (refer to the latest version), and the version information
> at the time of execution is saved in the instance data
>
> ### 2.3 Coding Design
>
> This also involves the import and export of workflow and job definition
> data. According to the previous community discussion, a coding scheme needs
> to be introduced. Each piece of data in workflow, relationship, and job
> will have a unique code. Related Issues: https://github
> .com/apache/incubator-dolphinscheduler/issues/3820
>
> Resource: RESOURCE_xxx
>
> Task: TASK_xxx
>
> Relation: RELATION_xxx
>
> Workflow: PROCESS_xxx
>
> Project: PROJECT_xxx
>
> ## 3. Design plan
>
> ### 3.1 Table model design
>
> #### 3.1.1 Job definition table: t_ds_task_definithon
>
> | Column Name | Description |
> | ----------------------- | -------------- |
> | id | Self-incrementing ID |
> | union_code | unique code |
> | version | Version |
> | name | Job name |
> | description | description |
> | task_type | Job type |
> | task_params | Job custom parameters |
> | run_flag | Run flag |
> | task_priority | Job priority |
> | worker_group | worker group |
> | fail_retry_times | Number of failed retries |
> | fail_retry_interval | Failure retry interval |
> | timeout_flag | Timeout flag |
> | timeout_notify_strategy | Timeout notification strategy |
> | timeout_duration | Timeout duration |
> | create_time | Creation time |
> | update_time | Modification time |
>
> #### 3.1.2 Task relation table: t_ds_task_relation
>
> | Column Name | Description |
> | ----------------------- | ------------------------- ------------- |
> | id | Self-incrementing ID |
> | union_code | unique code |
> | version | Version |
> | process_definition_code | Workflow coding |
> | node_code | Node code (workflow code/job code) |
> | post_node_code | Post node code (workflow code/job code) |
> | condition_type | Condition type 0: None 1: Judgment condition 2: Delay
> condition |
> | condition_params | Condition parameters |
> | create_time | Creation time |
> | update_time | Modification time |
>
> #### 3.1.3 Workflow definition table: t_ds_process_definithon
>
> | Column Name | Description |
> | ---- | ---- |
> | id | Self-incrementing ID |
> | union_code | unique code |
> | version | Version |
> | name | Workflow name |
> | project_code | Project code |
> | release_state | Release state |
> | user_id | Owning user ID |
> | description | description |
> | global_params | Global parameters |
> | flag | Whether the process is available: 0 is not available, 1 is
> available |
> | receivers | recipients |
> | receivers_cc | CC |
> | timeout | Timeout time |
> | tenant_id | tenant ID |
> | create_time | Creation time |
> | update_time | Modification time |
>
> #### 3.1.4 Job definition log table: t_ds_task_definithon_log
>
> Add operation type (add, modify, delete), operator, and operation time
> based on the job definition table
>
> #### 3.1.5 Job relation log table: t_ds_task_relation_log
>
> Add operation type (add, modify, delete), operator, and operation time
> based on the job relationship table
>
> #### 3.1.6 Workflow definition log table: t_ds_process_definithon_log
>
> Add operation type (add, modify, delete), operator, and operation time
> based on the workflow definition table
>
> ### 3.2 Frontend
>
> *The design here is just a personal idea, and the front-end help is needed
> to design the interaction*
>
> Need to add job management related functions, including: job list, job
> creation, update, delete, view details operations
>
> To create a workflow page, you need to split json into workflow definition
> data and job relationship data to the back-end API layer to save/update
>
> Workflow page, when dragging task nodes, add reference job options
>
> The conditional branch nodes and delay nodes need to be resolved into the
> conditional rule data in the relationship; conversely, the conditional rule
> data returned by the backend needs to be displayed as the corresponding
> node when querying the workflow
>
> ### 3.3 Master
>
> When the Master schedules the workflow, you need to modify <Build dag from
> json> to <Build dag from relational data>. When executing a workflow, first
> load the relational data in full (no job data is loaded here), generate
> DAG, and traverse DAG execution , And then get the job data that needs to
> be executed
>
> Other execution processes are consistent with existing processes
>
>
> --------------------------------------------------------------------------------------------------------------
>
> ## 1.现状
>
> 当前DS系统的工作流定义包含了任务定义数据和任务之间关系数据,并且在数据库的设计上,任务数据和任务关系数据是以一个字符串类型字段(process_definition_json)的方式,保存在工作流定义表(t_ds_process_definition)中。
>
> 随着工作流和任务的增加,会产生如下问题:
>
> - 任务数据、关系数据和工作流数据耦合在一起,对单任务调度的场景不友好,任务必须创建在工作流内
>
> - 任务无法复用,因为任务是创建在工作流内的
>
> - 维护成本高,牵一发动全身,修改任何一个任务,都需要整体更新工作流内数据,同时也增加了日志成本
>
> - 工作流内任务较多时,全局搜索和统计分析效率低,例如查询哪些任务用到了哪个数据源
>
> - 扩展性差,例如未来要实现血缘功能,只会导致工作流定义越来越臃肿
>
> - 任务、关系、工作流边界模糊,条件节点、延迟节点也被当做一种任务,实际是关系与条件的组合
>
> 基于以上痛点,我们需要重新定义任务、关系、工作流的业务边界,基于此重新设计它们的数据结构
>
> ## 2.设计思路
>
> ### 2.1 工作流、关系、作业
>
> 首先,我们抛开当前的实现,明确任务(后续描述更改为作业)、关系、工作流的业务边界,如何去解耦
>
> - 作业:调度系统要执行的任务,作业内只包含执行作业所需要的数据和资源
> -
> 关系:作业与作业之间的关系以及执行条件,包含执行关系(A完成后,执行B)和执行条件(A完成并成功后,执行B;A完成并失败后,执行C;A完成30分钟后,执行D)
> - 工作流:一组关系的载体,工作流只保存作业间的关系(DAG是工作流的一种展示形式,创建关系的一种方式)
>
> 结合当前DS支持的功能,我们可以做一个分类
>
> - 作业:依赖检查、子流程、Shell、存储过程、Sql、Spark、Flink、MR、Python、Http、DataX、Sqoop
> - 关系:串行执行、并行执行、聚合执行、条件分支、延迟执行
> - 工作流:调度执行的边界,包含一组关系
>
> #### 2.1.1 进一步细化
>
> 作业定义数据,和当前的作业定义数据差别不大,都是由公共字段和自定义字段组成,只需要去掉关系相关的字段就可以了。
>
> 工作流定义数据,和当前的工作流定义数据差别也不大,去掉json字段就可以了。
>
>
> 关系数据,我们根据分类可以抽象为两个节点和一个路径。节点就是作业,路径包含前置节点到后置节点需要满足的条件规则是什么,条件规则包含:无条件、判断条件、延迟条件。
>
> ### 2.2 版本管理
>
>
> 我们明确业务边界,解耦后它们之间就变成了引用关系,工作流和关系之间是一对多,关系和作业之间是一对多。不仅是定义数据,我们还要考虑实例数据,每次工作流的调度执行都会产生工作流实例,作业和工作流都是可以变更的,而工作流实例又要支持查看、重跑、恢复失败等。这就需要引入定义数据的版本管理了。每一次工作流、关系、作业变更都需要保存旧版本数据,生成新版本数据。
>
> 所以这里的设计思路是:
>
> 定义数据需要增加版本字段
>
> 定义表需要增加对应的日志表
>
> 创建定义数据时,双写到定义表和日志表,修改定义数据时,保存修改后的版本到日志表
>
> 定义表的引用数据中不需要保存版本信息(引用最新版本),实例数据中保存执行时的版本信息
>
> ### 2.3 编码设计
>
> 这里还涉及到工作流、作业定义数据导入导出问题,根据之前社区讨论的方案,需要引入编码方案,工作流、关系、作业每条数据都会有一个唯一编码,相关Issue:
> https://github.com/apache/incubator-dolphinscheduler/issues/3820
>
> 资源:RESOURCE_xxx
>
> 作业:TASK_xxx
>
> 关系:RELATION_xxx
>
> 工作流:PROCESS_xxx
>
> 项目:PROJECT_xxx
>
> ## 3.设计方案
>
> ### 3.1 表模型设计
>
> #### 3.1.1 作业定义表:t_ds_task_definithon
>
> | 列名                    | 描述           |
> | ----------------------- | -------------- |
> | id                      | 自增ID         |
> | union_code              | 唯一编码       |
> | version                 | 版本           |
> | name                    | 作业名称       |
> | description             | 描述           |
> | task_type               | 作业类型       |
> | task_params             | 作业自定义参数 |
> | run_flag                | 运行标志       |
> | task_priority           | 作业优先级     |
> | worker_group            | worker分组     |
> | fail_retry_times        | 失败重试次数   |
> | fail_retry_interval     | 失败重试间隔   |
> | timeout_flag            | 超时标志       |
> | timeout_notify_strategy | 超时通知策略   |
> | timeout_duration        | 超时时长       |
> | create_time             | 创建时间       |
> | update_time             | 修改时间       |
>
> #### 3.1.2 作业关系表:t_ds_task_relation
>
> | 列名                    | 描述                                   |
> | ----------------------- | -------------------------------------- |
> | id                      | 自增ID                                 |
> | union_code              | 唯一编码                               |
> | version                 | 版本                                   |
> | process_definition_code | 工作流编码                             |
> | node_code               | 节点编码(工作流编码/作业编码)        |
> | post_node_code          | 后置节点编码(工作流编码/作业编码)    |
> | condition_type          | 条件类型 0:无 1:判断条件 2:延迟条件 |
> | condition_params        | 条件参数                               |
> | create_time             | 创建时间                               |
> | update_time             | 修改时间                               |
>
> #### 3.1.3 工作流定义表:t_ds_process_definithon
>
> | 列名 | 描述 |
> | ---- | ---- |
> | id            | 自增ID                         |
> | union_code    | 唯一编码                       |
> | version       | 版本                           |
> | name          | 工作流名称                     |
> | project_code  | 项目编码                       |
> | release_state | 发布状态                       |
> | user_id       | 所属用户ID                     |
> | description   | 描述                           |
> | global_params | 全局参数                       |
> | flag          | 流程是否可用:0 不可用,1 可用 |
> | receivers     | 收件人                         |
> | receivers_cc  | 抄送人                         |
> | timeout       | 超时时间                       |
> | tenant_id     | 租户ID                         |
> | create_time   | 创建时间                       |
> | update_time   | 修改时间                       |
>
> #### 3.1.4 作业定义日志表:t_ds_task_definithon_log
>
> 作业定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间
>
> #### 3.1.5 作业关系日志表:t_ds_task_relation_log
>
> 作业关系表基础上增加操作类型(新增、修改、删除)、操作人、操作时间
>
> #### 3.1.6 工作流定义日志表:t_ds_process_definithon_log
>
> 工作流定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间
>
> ### 3.2 前端
>
> *这里的设计只是个人想法,交互上还需要前端帮助设计下*
>
> 需要增加作业管理相关功能,包括:作业列表,作业的创建、更新、删除、查看详情操作
>
> 创建工作流页面,需要将json拆分为工作流定义数据、作业关系数据传给后端API层保存/更新
>
> 工作流页面,拖拽任务节点时,增加引用作业选项
>
> 条件分支节点、延迟节点需要解析为关系中的条件规则数据;反之,查询工作流时需要将后端返回的条件规则数据展示为对应的节点
>
> ### 3.3 Master
>
>
> Master调度工作流时,需要将<从json构建dag>修改为<从关系数据构建dag>,执行一个工作流时先全量加载关系数据(这里不加载作业数据),生成DAG,遍历DAG执行时,再获取需要执行的作业数据
>
> 其他执行流程和现有流程一致
>
> --------------------
> DolphinScheduler(Incubator) Commtter
> Hemin Wen  温合民
> [email protected]
> --------------------
>

Reply via email to