Hi! About json splitting of workflow definition, The following is the design plan for splitting three tables.
Everyone can discuss together. -------------------------------------------------------------------------------------------------------------- ## 1. Currently The workflow definition of the current DS system includes task definition data and task relationship data. In the design of the database, task data and task relationship data are stored in the workflow as a string type field (process_definition_json) Definition table (t_ds_process_definition). With the increase of workflow and tasks, the following problems will arise: -Task data, relational data and workflow data are coupled together, which is not friendly to the scenario of single-task scheduling. The task must be created in the workflow -The task cannot be reused because the task is created in the workflow -The maintenance cost is high. If you move the whole body and modify any task, you need to update the data in the workflow as a whole, and it also increases the log cost -When there are many tasks in the workflow, the efficiency of global search and statistical analysis is low, such as querying which tasks use which data source -Poor scalability, for example, the realization of blood relationship function in the future will only lead to more and more bloated workflow definitions -Tasks, relationships, and workflow boundaries are blurred. Condition nodes and delay nodes are also regarded as a task, which is actually a combination of relationships and conditions Based on the above pain points, we need to redefine the business boundaries of tasks, relationships, and workflows, and redesign their data structures based on this ## 2. Design Ideas ### 2.1 Workflow, relation, job First of all, we set aside the current implementation and clarify the business boundaries of tasks (the subsequent description is changed to jobs), relationships, and workflows, and how to decouple -Job: the task that the scheduling system really needs to execute, the job only contains the data and resources needed to execute the job -relation: the relationship between the job and the job and the execution conditions, including the execution relationship (after A completes, execute B) and execution conditions (after A completes and succeeds, execute B; after A completes and fails, execute C; A completes 30 After minutes, execute D) -Workflow: the carrier of a set of relationships, the workflow only saves the relationships between jobs (DAG is a display form of workflow, a way to create relationships) Combined with the functions supported by the current DS, we can make a classification -Job: Dependency check, sub-process, Shell, stored procedure, Sql, Spark, Flink, MR, Python, Http, DataX, Sqoop -Relationship: serial execution, parallel execution, aggregate execution, conditional branch, delayed execution -Workflow: the boundary of scheduling execution, including a set of relationships #### 2.1.1 Further refinement The job definition data is not much different from the current job definition data. Both are composed of public fields and custom fields. You only need to remove the fields related to the relationship. The workflow definition data is not much different from the current workflow definition data, just remove the json field. Relational data, we can abstract into two nodes and one path according to classification. The node is the job, and the path includes the conditional rules that need to be met from the pre-node to the post-node. The conditional rules include: unconditional, judgment condition, and delay condition. ### 2.2 Version Management We clarify the business boundaries. After decoupling, they become a reference relationship. The workflow and the relationship are one-to-many, and the relationship and the job are one-to-many. Not only is the definition of data, we also need to consider instance data. Every time a workflow is scheduled and executed, a workflow instance will be generated. Jobs and workflows can be changed, and the workflow instance must support viewing, rerun, recovery failure, etc. . This requires the introduction of version management of the definition data. Every time workflow, relationship, and job changes need to save old version data and generate new version data. So the design idea here is: To define data, you need to add a version field The definition table needs to add the corresponding log table When creating definition data, double write to the definition table and log table. When modifying the definition data, save the modified version to the log table There is no need to save version information in the reference data of the definition table (refer to the latest version), and the version information at the time of execution is saved in the instance data ### 2.3 Coding Design This also involves the import and export of workflow and job definition data. According to the previous community discussion, a coding scheme needs to be introduced. Each piece of data in workflow, relationship, and job will have a unique code. Related Issues: https://github .com/apache/incubator-dolphinscheduler/issues/3820 Resource: RESOURCE_xxx Task: TASK_xxx Relation: RELATION_xxx Workflow: PROCESS_xxx Project: PROJECT_xxx ## 3. Design plan ### 3.1 Table model design #### 3.1.1 Job definition table: t_ds_task_definithon | Column Name | Description | | ----------------------- | -------------- | | id | Self-incrementing ID | | union_code | unique code | | version | Version | | name | Job name | | description | description | | task_type | Job type | | task_params | Job custom parameters | | run_flag | Run flag | | task_priority | Job priority | | worker_group | worker group | | fail_retry_times | Number of failed retries | | fail_retry_interval | Failure retry interval | | timeout_flag | Timeout flag | | timeout_notify_strategy | Timeout notification strategy | | timeout_duration | Timeout duration | | create_time | Creation time | | update_time | Modification time | #### 3.1.2 Task relation table: t_ds_task_relation | Column Name | Description | | ----------------------- | ------------------------- ------------- | | id | Self-incrementing ID | | union_code | unique code | | version | Version | | process_definition_code | Workflow coding | | node_code | Node code (workflow code/job code) | | post_node_code | Post node code (workflow code/job code) | | condition_type | Condition type 0: None 1: Judgment condition 2: Delay condition | | condition_params | Condition parameters | | create_time | Creation time | | update_time | Modification time | #### 3.1.3 Workflow definition table: t_ds_process_definithon | Column Name | Description | | ---- | ---- | | id | Self-incrementing ID | | union_code | unique code | | version | Version | | name | Workflow name | | project_code | Project code | | release_state | Release state | | user_id | Owning user ID | | description | description | | global_params | Global parameters | | flag | Whether the process is available: 0 is not available, 1 is available | | receivers | recipients | | receivers_cc | CC | | timeout | Timeout time | | tenant_id | tenant ID | | create_time | Creation time | | update_time | Modification time | #### 3.1.4 Job definition log table: t_ds_task_definithon_log Add operation type (add, modify, delete), operator, and operation time based on the job definition table #### 3.1.5 Job relation log table: t_ds_task_relation_log Add operation type (add, modify, delete), operator, and operation time based on the job relationship table #### 3.1.6 Workflow definition log table: t_ds_process_definithon_log Add operation type (add, modify, delete), operator, and operation time based on the workflow definition table ### 3.2 Frontend *The design here is just a personal idea, and the front-end help is needed to design the interaction* Need to add job management related functions, including: job list, job creation, update, delete, view details operations To create a workflow page, you need to split json into workflow definition data and job relationship data to the back-end API layer to save/update Workflow page, when dragging task nodes, add reference job options The conditional branch nodes and delay nodes need to be resolved into the conditional rule data in the relationship; conversely, the conditional rule data returned by the backend needs to be displayed as the corresponding node when querying the workflow ### 3.3 Master When the Master schedules the workflow, you need to modify <Build dag from json> to <Build dag from relational data>. When executing a workflow, first load the relational data in full (no job data is loaded here), generate DAG, and traverse DAG execution , And then get the job data that needs to be executed Other execution processes are consistent with existing processes -------------------------------------------------------------------------------------------------------------- ## 1.现状 当前DS系统的工作流定义包含了任务定义数据和任务之间关系数据,并且在数据库的设计上,任务数据和任务关系数据是以一个字符串类型字段(process_definition_json)的方式,保存在工作流定义表(t_ds_process_definition)中。 随着工作流和任务的增加,会产生如下问题: - 任务数据、关系数据和工作流数据耦合在一起,对单任务调度的场景不友好,任务必须创建在工作流内 - 任务无法复用,因为任务是创建在工作流内的 - 维护成本高,牵一发动全身,修改任何一个任务,都需要整体更新工作流内数据,同时也增加了日志成本 - 工作流内任务较多时,全局搜索和统计分析效率低,例如查询哪些任务用到了哪个数据源 - 扩展性差,例如未来要实现血缘功能,只会导致工作流定义越来越臃肿 - 任务、关系、工作流边界模糊,条件节点、延迟节点也被当做一种任务,实际是关系与条件的组合 基于以上痛点,我们需要重新定义任务、关系、工作流的业务边界,基于此重新设计它们的数据结构 ## 2.设计思路 ### 2.1 工作流、关系、作业 首先,我们抛开当前的实现,明确任务(后续描述更改为作业)、关系、工作流的业务边界,如何去解耦 - 作业:调度系统要执行的任务,作业内只包含执行作业所需要的数据和资源 - 关系:作业与作业之间的关系以及执行条件,包含执行关系(A完成后,执行B)和执行条件(A完成并成功后,执行B;A完成并失败后,执行C;A完成30分钟后,执行D) - 工作流:一组关系的载体,工作流只保存作业间的关系(DAG是工作流的一种展示形式,创建关系的一种方式) 结合当前DS支持的功能,我们可以做一个分类 - 作业:依赖检查、子流程、Shell、存储过程、Sql、Spark、Flink、MR、Python、Http、DataX、Sqoop - 关系:串行执行、并行执行、聚合执行、条件分支、延迟执行 - 工作流:调度执行的边界,包含一组关系 #### 2.1.1 进一步细化 作业定义数据,和当前的作业定义数据差别不大,都是由公共字段和自定义字段组成,只需要去掉关系相关的字段就可以了。 工作流定义数据,和当前的工作流定义数据差别也不大,去掉json字段就可以了。 关系数据,我们根据分类可以抽象为两个节点和一个路径。节点就是作业,路径包含前置节点到后置节点需要满足的条件规则是什么,条件规则包含:无条件、判断条件、延迟条件。 ### 2.2 版本管理 我们明确业务边界,解耦后它们之间就变成了引用关系,工作流和关系之间是一对多,关系和作业之间是一对多。不仅是定义数据,我们还要考虑实例数据,每次工作流的调度执行都会产生工作流实例,作业和工作流都是可以变更的,而工作流实例又要支持查看、重跑、恢复失败等。这就需要引入定义数据的版本管理了。每一次工作流、关系、作业变更都需要保存旧版本数据,生成新版本数据。 所以这里的设计思路是: 定义数据需要增加版本字段 定义表需要增加对应的日志表 创建定义数据时,双写到定义表和日志表,修改定义数据时,保存修改后的版本到日志表 定义表的引用数据中不需要保存版本信息(引用最新版本),实例数据中保存执行时的版本信息 ### 2.3 编码设计 这里还涉及到工作流、作业定义数据导入导出问题,根据之前社区讨论的方案,需要引入编码方案,工作流、关系、作业每条数据都会有一个唯一编码,相关Issue: https://github.com/apache/incubator-dolphinscheduler/issues/3820 资源:RESOURCE_xxx 作业:TASK_xxx 关系:RELATION_xxx 工作流:PROCESS_xxx 项目:PROJECT_xxx ## 3.设计方案 ### 3.1 表模型设计 #### 3.1.1 作业定义表:t_ds_task_definithon | 列名 | 描述 | | ----------------------- | -------------- | | id | 自增ID | | union_code | 唯一编码 | | version | 版本 | | name | 作业名称 | | description | 描述 | | task_type | 作业类型 | | task_params | 作业自定义参数 | | run_flag | 运行标志 | | task_priority | 作业优先级 | | worker_group | worker分组 | | fail_retry_times | 失败重试次数 | | fail_retry_interval | 失败重试间隔 | | timeout_flag | 超时标志 | | timeout_notify_strategy | 超时通知策略 | | timeout_duration | 超时时长 | | create_time | 创建时间 | | update_time | 修改时间 | #### 3.1.2 作业关系表:t_ds_task_relation | 列名 | 描述 | | ----------------------- | -------------------------------------- | | id | 自增ID | | union_code | 唯一编码 | | version | 版本 | | process_definition_code | 工作流编码 | | node_code | 节点编码(工作流编码/作业编码) | | post_node_code | 后置节点编码(工作流编码/作业编码) | | condition_type | 条件类型 0:无 1:判断条件 2:延迟条件 | | condition_params | 条件参数 | | create_time | 创建时间 | | update_time | 修改时间 | #### 3.1.3 工作流定义表:t_ds_process_definithon | 列名 | 描述 | | ---- | ---- | | id | 自增ID | | union_code | 唯一编码 | | version | 版本 | | name | 工作流名称 | | project_code | 项目编码 | | release_state | 发布状态 | | user_id | 所属用户ID | | description | 描述 | | global_params | 全局参数 | | flag | 流程是否可用:0 不可用,1 可用 | | receivers | 收件人 | | receivers_cc | 抄送人 | | timeout | 超时时间 | | tenant_id | 租户ID | | create_time | 创建时间 | | update_time | 修改时间 | #### 3.1.4 作业定义日志表:t_ds_task_definithon_log 作业定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 #### 3.1.5 作业关系日志表:t_ds_task_relation_log 作业关系表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 #### 3.1.6 工作流定义日志表:t_ds_process_definithon_log 工作流定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 ### 3.2 前端 *这里的设计只是个人想法,交互上还需要前端帮助设计下* 需要增加作业管理相关功能,包括:作业列表,作业的创建、更新、删除、查看详情操作 创建工作流页面,需要将json拆分为工作流定义数据、作业关系数据传给后端API层保存/更新 工作流页面,拖拽任务节点时,增加引用作业选项 条件分支节点、延迟节点需要解析为关系中的条件规则数据;反之,查询工作流时需要将后端返回的条件规则数据展示为对应的节点 ### 3.3 Master Master调度工作流时,需要将<从json构建dag>修改为<从关系数据构建dag>,执行一个工作流时先全量加载关系数据(这里不加载作业数据),生成DAG,遍历DAG执行时,再获取需要执行的作业数据 其他执行流程和现有流程一致 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] --------------------
