I create a new branch(json_split) from dev for this discussion. If you want to join us, please leave your message on the issue page.
https://github.com/apache/incubator-dolphinscheduler/issues/4417 leon bao <[email protected]> 于2020年12月29日周二 下午4:46写道: > Thank you all! > According to this discussion, I have sorted out a task list about JSON > split. > I very much hope that people who are interested in implementing coding can > join us. > the issue : > https://github.com/apache/incubator-dolphinscheduler/issues/4325 > > Hemin Wen <[email protected]> 于2020年12月4日周五 下午2:24写道: > >> According to the results of the discussion, the plan has been re-optimized >> and related development work: >> >> ## 1. Currently >> The workflow definition of the current DS system includes task definition >> data and task relationship data. In the design of the database, task data >> and task relationship data are stored in the workflow as a string type >> field (process_definition_json) Definition table >> (t_ds_process_definition). >> >> With the increase of workflow and tasks, the following problems will >> arise: >> >> -Task data, relational data and workflow data are coupled together, which >> is not friendly to the scenario of single-task scheduling. The task must >> be >> created in the workflow >> >> -The task cannot be reused because the task is created in the workflow >> >> -The maintenance cost is high. If you move the whole body and modify any >> task, you need to update the data in the workflow as a whole, and it also >> increases the log cost >> >> -When there are many tasks in the workflow, the efficiency of global >> search >> and statistical analysis is low, such as querying which tasks use which >> data source >> >> -Poor scalability, for example, the realization of blood relationship >> function in the future will only lead to more and more bloated workflow >> definitions >> >> -Tasks, relationships, and workflow boundaries are blurred. Condition >> nodes >> and delay nodes are also regarded as a task, which is actually a >> combination of relationships and conditions >> >> Based on the above pain points, we need to redefine the business >> boundaries >> of tasks, relationships, and workflows, and redesign their data structures >> based on this >> >> ## 2. Design Ideas >> >> ### 2.1 Workflow, relationship, job >> >> First of all, we set aside the current implementation and clarify the >> business boundaries of tasks (the subsequent description is changed to >> jobs), relationships, and workflows, and how to decouple >> >> -Job: the task to be executed by the scheduling system, the job only >> contains the data and resources needed to execute the job >> -Relationship: the relationship between the job and the job and the >> execution conditions, including the execution relationship (after A >> completes, execute B) and execution conditions (after A completes and >> succeeds, execute B; after A completes and fails, execute C; A completes >> 30 >> After minutes, execute D) >> -Workflow: the carrier of a set of relationships, the workflow only saves >> the relationships between jobs (DAG is a form of presentation of workflow, >> a way to create relationships) >> >> Combined with the functions supported by the current DS, we can make a >> classification >> >> -Job: Dependency check, sub-process, Shell, stored procedure, Sql, Spark, >> Flink, MR, Python, Http, DataX, Sqoop >> -Relationship: serial execution, parallel execution, aggregate execution, >> conditional branch, delayed execution >> -Workflow: the boundary of scheduling execution, including a set of >> relationships >> >> #### 2.1.1 Further refinement >> >> The job definition data is not much different from the current job >> definition data. Both are composed of public fields and custom fields. You >> only need to remove the fields related to the relationship. >> >> The workflow definition data is not much different from the current >> workflow definition data, just remove the json field. >> >> Relational data, we can abstract into two nodes and one path according to >> classification. The node is the job. The path includes the conditional >> rules that need to be met from the pre-node to the post-node. The >> conditional rules include: unconditional, judgment condition, and delay >> condition. >> >> ### 2.2 Version Management >> >> We clarify the business boundaries. After decoupling, they become a >> reference relationship. The workflow and the relationship are one-to-many, >> and the relationship and the job are one-to-many. The definition data also >> needs to save the version record, which can support the restoration of >> historical data in the future. >> >> So the design idea here is: >> >> To define data, you need to add a version field >> >> The definition table needs to add the corresponding log table >> >> When creating definition data, double write to the definition table and >> log >> table. When modifying the definition data, save the modified version to >> the >> log table >> >> There is no need to save version information in the reference data of the >> definition table (quote the latest version) >> >> ### 2.3 Example data >> >> The current DB design already has workflow instance tables and task >> instance tables, and DS currently supports data changes in instance >> tables. >> The instance table cannot only save the code and version information of >> the >> definition table, but also needs to maintain detailed definition data. >> Therefore, it is necessary to split the workflow instance table into a >> workflow instance table and a job relationship table, and the task >> instance >> table is generally unchanged. The fields of the three instance tables are >> basically the same as those of the definition table. >> >> ### 2.4 Business Logo Design >> >> Here is also involved in the import and export of workflow and job >> definition data. According to the previous community discussion, business >> identification needs to be introduced. Each data in the workflow >> definition >> table and job definition table will have a business identification, >> relationship definition data, and dependent jobs Establish a reference >> relationship with the sub-workflow job through the business identifier. >> The >> specific realization of the business logo is the voting result of the plan >> to be designed. >> >> Related Issues: >> https://github.com/apache/incubator-dolphinscheduler/issues/3820 >> >> Design plan: >> >> ## 3. Design plan >> >> ### 3.1 Table model design >> >> #### 3.1.1 Workflow definition table: t_ds_process_definithon >> >> | Column Name | Description | >> | ---- | ---- | >> | id | Self-incrementing ID | >> | code | Code (the original name field) | >> | version | Version | >> | description | description | >> | project_code | Project code | >> | release_state | Release state | >> | user_id | Owning user ID | >> | global_params | Global parameters | >> | flag | Whether the process is available: 0 is not available, 1 is >> available | >> | receivers | recipients | >> | receivers_cc | CC | >> | timeout | Timeout time | >> | tenant_id | tenant ID | >> | locations | Node coordinate information | >> | create_time | Creation time | >> | update_time | Modification time | >> >> #### 3.1.2 Workflow job relationship table: t_ds_process_task_relation >> >> Note: The last node has unconditional data and post data. Here you can >> imagine the two ends of a line, the left is the front node, the middle is >> the condition, and the right is the post node. >> >> | Column Name | Description | >> | ----------------------- | ------------------------- ------------- | >> | id | Self-incrementing ID | >> | project_code | Project code | >> | process_definition_code | Workflow coding | >> | pre_project_code | Pre-quoted project code | >> | pre_task_code | Pre-reference job code | >> | condition_type | Condition type 0: None 1: Judgment 2: Delayed | >> | condition_params | Condition parameters (json) | >> | post_project_code | Post reference project code | >> | post_task_code | Post reference job code | >> | create_time | Creation time | >> | update_time | Modification time | >> >> #### 3.1.3 Job definition table: t_ds_task_definithon >> >> | Column Name | Description | >> | ----------------------- | -------------- | >> | id | Self-incrementing ID | >> | code | Code (the original name field) | >> | version | Version | >> | description | description | >> | project_code | Project code | >> | task_type | Job type | >> | task_params | Job custom parameters | >> | run_flag | Run flag | >> | task_priority | Job priority | >> | worker_group | worker group | >> | fail_retry_times | Number of failed retries | >> | fail_retry_interval | Failure retry interval | >> | timeout_flag | Timeout flag | >> | timeout_notify_strategy | Timeout notification strategy | >> | timeout_duration | Timeout duration | >> | create_time | Creation time | >> | update_time | Modification time | >> >> #### 3.1.4 Workflow definition log table: t_ds_process_definithon_log >> >> Add operation type (add, modify, delete), operator, and operation time >> based on the workflow definition table >> >> #### 3.1.5 Workflow job relationship log table: >> t_ds_process_task_relation_log >> >> Add workflow version, operation type (add, modify, delete), operator, >> operation time based on the job relationship table >> >> #### 3.1.6 Job definition log table: t_ds_task_definithon_log >> >> Add operation type (add, modify, delete), operator, and operation time >> based on the job definition table >> >> ### 3.2 Master-Worker scheduling design >> >> When the Master schedules the workflow, it queries the workflow details >> and >> all job relationship data according to the project code and workflow code >> (job data is not loaded here), generates a DAG, traverses the DAG job, and >> sends the project code and job code to the Worker. Project code, job code >> query detailed job data and execute the job >> >> ## 4. Related work split >> >> ### 4.1 Frontend >> >> Added job management related functions, including: job list, job creation, >> update, delete, view details operations >> >> To create a workflow page, you need to pass workflow information, job >> relationship information, and job information to the back-end API layer to >> save/update >> >> Workflow page, when dragging and dropping task nodes, it also supports >> reference project-job (default current search job under current project) >> and create job operation >> >> ### 4.2 API layer >> >> Added job data related processing interface, including version processing >> (query, create, modify, delete, online and offline...) >> >> Refactored workflow data related processing interface, including version >> processing (query, create, modify, delete, import, export, online and >> offline...) >> >> Refactored the processing interface of workflow instance data (query, >> modify, Gantt chart) >> >> Refactoring job instance query interface >> >> Refactored workflow instance, job instance related statistical interface >> (UI system homepage, project homepage statistical data, related monitoring >> data) >> >> ### 3.3 Master >> >> Rebuild Master according to the <3.2 Master-Worker Scheduling Design> >> scheme >> >> ### 3.4 Worker >> >> Refactor Worker according to the <3.2 Master-Worker Scheduling Design> >> scheme >> >> >> ------------------------------------------------------------------------------------------------------------------------------------------ >> >> 根据讨论结果,重新优化了方案,还有相关开发工作 >> >> ## 1.现状 >> >> 当前DS系统的工作流定义包含了任务定义数据和任务之间关系数据,并且在数据库的设计上,任务数据和任务关系数据是以一个字符串类型字段(process_definition_json)的方式,保存在工作流定义表(t_ds_process_definition)中。 >> >> 随着工作流和任务的增加,会产生如下问题: >> >> - 任务数据、关系数据和工作流数据耦合在一起,对单任务调度的场景不友好,任务必须创建在工作流内 >> >> - 任务无法复用,因为任务是创建在工作流内的 >> >> - 维护成本高,牵一发动全身,修改任何一个任务,都需要整体更新工作流内数据,同时也增加了日志成本 >> >> - 工作流内任务较多时,全局搜索和统计分析效率低,例如查询哪些任务用到了哪个数据源 >> >> - 扩展性差,例如未来要实现血缘功能,只会导致工作流定义越来越臃肿 >> >> - 任务、关系、工作流边界模糊,条件节点、延迟节点也被当做一种任务,实际是关系与条件的组合 >> >> 基于以上痛点,我们需要重新定义任务、关系、工作流的业务边界,基于此重新设计它们的数据结构 >> >> ## 2.设计思路 >> >> ### 2.1 工作流、关系、作业 >> >> 首先,我们抛开当前的实现,明确任务(后续描述更改为作业)、关系、工作流的业务边界,如何去解耦 >> >> - 作业:调度系统要执行的任务,作业内只包含执行作业所需要的数据和资源 >> - >> >> 关系:作业与作业之间的关系以及执行条件,包含执行关系(A完成后,执行B)和执行条件(A完成并成功后,执行B;A完成并失败后,执行C;A完成30分钟后,执行D) >> - 工作流:一组关系的载体,工作流只保存作业间的关系(DAG是工作流的一种展示形式,创建关系的一种方式) >> >> 结合当前DS支持的功能,我们可以做一个分类 >> >> - 作业:依赖检查、子流程、Shell、存储过程、Sql、Spark、Flink、MR、Python、Http、DataX、Sqoop >> - 关系:串行执行、并行执行、聚合执行、条件分支、延迟执行 >> - 工作流:调度执行的边界,包含一组关系 >> >> #### 2.1.1 进一步细化 >> >> 作业定义数据,和当前的作业定义数据差别不大,都是由公共字段和自定义字段组成,只需要去掉关系相关的字段就可以了。 >> >> 工作流定义数据,和当前的工作流定义数据差别也不大,去掉json字段就可以了。 >> >> >> 关系数据,我们根据分类可以抽象为两个节点和一个路径。节点就是作业,路径包含前置节点到后置节点需要满足的条件规则是什么,条件规则包含:无条件、判断条件、延迟条件。 >> >> ### 2.2 版本管理 >> >> >> 我们明确业务边界,解耦后它们之间就变成了引用关系,工作流和关系之间是一对多,关系和作业之间是一对多。定义数据还需要保存版本记录,后续可以支持恢复历史数据。 >> >> 所以这里的设计思路是: >> >> 定义数据需要增加版本字段 >> >> 定义表需要增加对应的日志表 >> >> 创建定义数据时,双写到定义表和日志表,修改定义数据时,保存修改后的版本到日志表 >> >> 定义表的引用数据中不需要保存版本信息(引用最新版本) >> >> ### 2.3 实例数据 >> >> >> 当前DB设计已经有工作流实例表、任务实例表,而且DS当前支持实例表的数据变更。实例表不能仅保存定义表的编码和版本信息,还需要维护详细的定义数据。所以需要将工作流实例表拆分为工作流实例表和作业关系表,任务实例表总体不变。三张实例表的字段和定义表字段基本一致。 >> >> ### 2.4 业务标识设计 >> >> >> 这里还涉及到工作流、作业定义数据导入导出问题,根据之前社区讨论的方案,需要引入业务标识,工作流定义表和作业定义表每条数据都会有一个业务标识,关系定义数据、依赖作业和子工作流作业内部通过业务标识建立引用关系。业务标识的具体实现待设计方案的投票结果。 >> >> 相关Issue:https://github.com/apache/incubator-dolphinscheduler/issues/3820 >> >> 设计方案: >> >> ## 3.设计方案 >> >> ### 3.1 表模型设计 >> >> #### 3.1.1 工作流定义表:t_ds_process_definithon >> >> | 列名 | 描述 | >> | ---- | ---- | >> | id | 自增ID | >> | code | 编码(原name字段) | >> | version | 版本 | >> | description | 描述 | >> | project_code | 项目编码 | >> | release_state | 发布状态 | >> | user_id | 所属用户ID | >> | global_params | 全局参数 | >> | flag | 流程是否可用:0 不可用,1 可用 | >> | receivers | 收件人 | >> | receivers_cc | 抄送人 | >> | timeout | 超时时间 | >> | tenant_id | 租户ID | >> | locations | 节点坐标信息 | >> | create_time | 创建时间 | >> | update_time | 修改时间 | >> >> #### 3.1.2 工作流作业关系表:t_ds_process_task_relation >> >> 注:最后一个节点无条件数据和后置数据,这里可以想象一条线的两端,左边是前置节点,中间是条件,右边是后置节点 >> >> | 列名 | 描述 | >> | ----------------------- | -------------------------------------- | >> | id | 自增ID | >> | project_code | 项目编码 | >> | process_definition_code | 工作流编码 | >> | pre_project_code | 前置引用项目编码 | >> | pre_task_code | 前置引用作业编码 | >> | condition_type | 条件类型 0:无 1:判断 2:延迟 | >> | condition_params | 条件参数(json) | >> | post_project_code | 后置引用项目编码 | >> | post_task_code | 后置引用作业编码 | >> | create_time | 创建时间 | >> | update_time | 修改时间 | >> >> #### 3.1.3 作业定义表:t_ds_task_definithon >> >> | 列名 | 描述 | >> | ----------------------- | -------------- | >> | id | 自增ID | >> | code | 编码(原name字段) | >> | version | 版本 | >> | description | 描述 | >> | project_code | 项目编码 | >> | task_type | 作业类型 | >> | task_params | 作业自定义参数 | >> | run_flag | 运行标志 | >> | task_priority | 作业优先级 | >> | worker_group | worker分组 | >> | fail_retry_times | 失败重试次数 | >> | fail_retry_interval | 失败重试间隔 | >> | timeout_flag | 超时标志 | >> | timeout_notify_strategy | 超时通知策略 | >> | timeout_duration | 超时时长 | >> | create_time | 创建时间 | >> | update_time | 修改时间 | >> >> #### 3.1.4 工作流定义日志表:t_ds_process_definithon_log >> >> 工作流定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 >> >> #### 3.1.5 工作流作业关系日志表:t_ds_process_task_relation_log >> >> 作业关系表基础上增加工作流版本、操作类型(新增、修改、删除)、操作人、操作时间 >> >> #### 3.1.6 作业定义日志表:t_ds_task_definithon_log >> >> 作业定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 >> >> ### 3.2 Master-Worker调度设计 >> >> >> Master调度工作流时,根据项目编码、工作流编码查询工作流详细信息和所有作业关系数据(这里不加载作业数据),生成DAG,遍历DAG作业,发送项目编码、作业编码到Worker,Worker根据项目编码、作业编码查询作业详细数据并执行作业 >> >> ## 4.相关工作拆分 >> >> ### 4.1 前端 >> >> 增加作业管理相关功能,包括:作业列表,作业的创建、更新、删除、查看详情操作 >> >> 创建工作流页面,需要将工作流信息、作业关系信息、作业信息传给后端API层保存/更新 >> >> 工作流页面,拖拽任务节点时,同时支持引用项目-作业(默认当前搜索当前项目下作业)和创建作业操作 >> >> ### 4.2 API层 >> >> 增加作业数据相关处理接口,包含版本处理(查询、新建、修改、删除、上下线...) >> >> 重构工作流数据相关处理接口,包含版本处理(查询、新建、修改、删除、导入、导出、上下线...) >> >> 重构工作流实例数据相关处理接口(查询、修改、甘特图) >> >> 重构作业实例查询接口 >> >> 重构工作流实例、作业实例相关统计接口(UI系统首页、项目首页统计数据、相关监控数据) >> >> ### 3.3 Master >> >> 根据《3.2 Master-Worker调度设计》方案重构Master >> >> ### 3.4 Worker >> >> 根据《3.2 Master-Worker调度设计》方案重构Worker >> >> -------------------- >> DolphinScheduler(Incubator) Commtter >> Hemin Wen 温合民 >> [email protected] >> -------------------- >> >> >> Hemin Wen <[email protected]> 于2020年11月25日周三 上午10:01写道: >> >> > Hi! >> > >> > About json splitting of workflow definition, The following is the design >> > plan for splitting three tables. >> > >> > Everyone can discuss together. >> > >> > >> > >> -------------------------------------------------------------------------------------------------------------- >> > >> > ## 1. Currently >> > The workflow definition of the current DS system includes task >> definition >> > data and task relationship data. In the design of the database, task >> data >> > and task relationship data are stored in the workflow as a string type >> > field (process_definition_json) Definition table >> (t_ds_process_definition). >> > >> > With the increase of workflow and tasks, the following problems will >> arise: >> > >> > -Task data, relational data and workflow data are coupled together, >> which >> > is not friendly to the scenario of single-task scheduling. The task >> must be >> > created in the workflow >> > >> > -The task cannot be reused because the task is created in the workflow >> > >> > -The maintenance cost is high. If you move the whole body and modify any >> > task, you need to update the data in the workflow as a whole, and it >> also >> > increases the log cost >> > >> > -When there are many tasks in the workflow, the efficiency of global >> > search and statistical analysis is low, such as querying which tasks use >> > which data source >> > >> > -Poor scalability, for example, the realization of blood relationship >> > function in the future will only lead to more and more bloated workflow >> > definitions >> > >> > -Tasks, relationships, and workflow boundaries are blurred. Condition >> > nodes and delay nodes are also regarded as a task, which is actually a >> > combination of relationships and conditions >> > >> > Based on the above pain points, we need to redefine the business >> > boundaries of tasks, relationships, and workflows, and redesign their >> data >> > structures based on this >> > >> > ## 2. Design Ideas >> > >> > ### 2.1 Workflow, relation, job >> > >> > First of all, we set aside the current implementation and clarify the >> > business boundaries of tasks (the subsequent description is changed to >> > jobs), relationships, and workflows, and how to decouple >> > >> > -Job: the task that the scheduling system really needs to execute, the >> job >> > only contains the data and resources needed to execute the job >> > -relation: the relationship between the job and the job and the >> execution >> > conditions, including the execution relationship (after A completes, >> > execute B) and execution conditions (after A completes and succeeds, >> > execute B; after A completes and fails, execute C; A completes 30 After >> > minutes, execute D) >> > -Workflow: the carrier of a set of relationships, the workflow only >> saves >> > the relationships between jobs (DAG is a display form of workflow, a >> way to >> > create relationships) >> > >> > Combined with the functions supported by the current DS, we can make a >> > classification >> > >> > -Job: Dependency check, sub-process, Shell, stored procedure, Sql, >> Spark, >> > Flink, MR, Python, Http, DataX, Sqoop >> > -Relationship: serial execution, parallel execution, aggregate >> execution, >> > conditional branch, delayed execution >> > -Workflow: the boundary of scheduling execution, including a set of >> > relationships >> > >> > #### 2.1.1 Further refinement >> > >> > The job definition data is not much different from the current job >> > definition data. Both are composed of public fields and custom fields. >> You >> > only need to remove the fields related to the relationship. >> > >> > The workflow definition data is not much different from the current >> > workflow definition data, just remove the json field. >> > >> > Relational data, we can abstract into two nodes and one path according >> to >> > classification. The node is the job, and the path includes the >> conditional >> > rules that need to be met from the pre-node to the post-node. The >> > conditional rules include: unconditional, judgment condition, and delay >> > condition. >> > >> > ### 2.2 Version Management >> > >> > We clarify the business boundaries. After decoupling, they become a >> > reference relationship. The workflow and the relationship are >> one-to-many, >> > and the relationship and the job are one-to-many. Not only is the >> > definition of data, we also need to consider instance data. Every time a >> > workflow is scheduled and executed, a workflow instance will be >> generated. >> > Jobs and workflows can be changed, and the workflow instance must >> support >> > viewing, rerun, recovery failure, etc. . This requires the introduction >> of >> > version management of the definition data. Every time workflow, >> > relationship, and job changes need to save old version data and generate >> > new version data. >> > >> > So the design idea here is: >> > >> > To define data, you need to add a version field >> > >> > The definition table needs to add the corresponding log table >> > >> > When creating definition data, double write to the definition table and >> > log table. When modifying the definition data, save the modified >> version to >> > the log table >> > >> > There is no need to save version information in the reference data of >> the >> > definition table (refer to the latest version), and the version >> information >> > at the time of execution is saved in the instance data >> > >> > ### 2.3 Coding Design >> > >> > This also involves the import and export of workflow and job definition >> > data. According to the previous community discussion, a coding scheme >> needs >> > to be introduced. Each piece of data in workflow, relationship, and job >> > will have a unique code. Related Issues: https://github >> > .com/apache/incubator-dolphinscheduler/issues/3820 >> > >> > Resource: RESOURCE_xxx >> > >> > Task: TASK_xxx >> > >> > Relation: RELATION_xxx >> > >> > Workflow: PROCESS_xxx >> > >> > Project: PROJECT_xxx >> > >> > ## 3. Design plan >> > >> > ### 3.1 Table model design >> > >> > #### 3.1.1 Job definition table: t_ds_task_definithon >> > >> > | Column Name | Description | >> > | ----------------------- | -------------- | >> > | id | Self-incrementing ID | >> > | union_code | unique code | >> > | version | Version | >> > | name | Job name | >> > | description | description | >> > | task_type | Job type | >> > | task_params | Job custom parameters | >> > | run_flag | Run flag | >> > | task_priority | Job priority | >> > | worker_group | worker group | >> > | fail_retry_times | Number of failed retries | >> > | fail_retry_interval | Failure retry interval | >> > | timeout_flag | Timeout flag | >> > | timeout_notify_strategy | Timeout notification strategy | >> > | timeout_duration | Timeout duration | >> > | create_time | Creation time | >> > | update_time | Modification time | >> > >> > #### 3.1.2 Task relation table: t_ds_task_relation >> > >> > | Column Name | Description | >> > | ----------------------- | ------------------------- ------------- | >> > | id | Self-incrementing ID | >> > | union_code | unique code | >> > | version | Version | >> > | process_definition_code | Workflow coding | >> > | node_code | Node code (workflow code/job code) | >> > | post_node_code | Post node code (workflow code/job code) | >> > | condition_type | Condition type 0: None 1: Judgment condition 2: Delay >> > condition | >> > | condition_params | Condition parameters | >> > | create_time | Creation time | >> > | update_time | Modification time | >> > >> > #### 3.1.3 Workflow definition table: t_ds_process_definithon >> > >> > | Column Name | Description | >> > | ---- | ---- | >> > | id | Self-incrementing ID | >> > | union_code | unique code | >> > | version | Version | >> > | name | Workflow name | >> > | project_code | Project code | >> > | release_state | Release state | >> > | user_id | Owning user ID | >> > | description | description | >> > | global_params | Global parameters | >> > | flag | Whether the process is available: 0 is not available, 1 is >> > available | >> > | receivers | recipients | >> > | receivers_cc | CC | >> > | timeout | Timeout time | >> > | tenant_id | tenant ID | >> > | create_time | Creation time | >> > | update_time | Modification time | >> > >> > #### 3.1.4 Job definition log table: t_ds_task_definithon_log >> > >> > Add operation type (add, modify, delete), operator, and operation time >> > based on the job definition table >> > >> > #### 3.1.5 Job relation log table: t_ds_task_relation_log >> > >> > Add operation type (add, modify, delete), operator, and operation time >> > based on the job relationship table >> > >> > #### 3.1.6 Workflow definition log table: t_ds_process_definithon_log >> > >> > Add operation type (add, modify, delete), operator, and operation time >> > based on the workflow definition table >> > >> > ### 3.2 Frontend >> > >> > *The design here is just a personal idea, and the front-end help is >> needed >> > to design the interaction* >> > >> > Need to add job management related functions, including: job list, job >> > creation, update, delete, view details operations >> > >> > To create a workflow page, you need to split json into workflow >> definition >> > data and job relationship data to the back-end API layer to save/update >> > >> > Workflow page, when dragging task nodes, add reference job options >> > >> > The conditional branch nodes and delay nodes need to be resolved into >> the >> > conditional rule data in the relationship; conversely, the conditional >> rule >> > data returned by the backend needs to be displayed as the corresponding >> > node when querying the workflow >> > >> > ### 3.3 Master >> > >> > When the Master schedules the workflow, you need to modify <Build dag >> from >> > json> to <Build dag from relational data>. When executing a workflow, >> first >> > load the relational data in full (no job data is loaded here), generate >> > DAG, and traverse DAG execution , And then get the job data that needs >> to >> > be executed >> > >> > Other execution processes are consistent with existing processes >> > >> > >> > >> -------------------------------------------------------------------------------------------------------------- >> > >> > ## 1.现状 >> > >> > >> 当前DS系统的工作流定义包含了任务定义数据和任务之间关系数据,并且在数据库的设计上,任务数据和任务关系数据是以一个字符串类型字段(process_definition_json)的方式,保存在工作流定义表(t_ds_process_definition)中。 >> > >> > 随着工作流和任务的增加,会产生如下问题: >> > >> > - 任务数据、关系数据和工作流数据耦合在一起,对单任务调度的场景不友好,任务必须创建在工作流内 >> > >> > - 任务无法复用,因为任务是创建在工作流内的 >> > >> > - 维护成本高,牵一发动全身,修改任何一个任务,都需要整体更新工作流内数据,同时也增加了日志成本 >> > >> > - 工作流内任务较多时,全局搜索和统计分析效率低,例如查询哪些任务用到了哪个数据源 >> > >> > - 扩展性差,例如未来要实现血缘功能,只会导致工作流定义越来越臃肿 >> > >> > - 任务、关系、工作流边界模糊,条件节点、延迟节点也被当做一种任务,实际是关系与条件的组合 >> > >> > 基于以上痛点,我们需要重新定义任务、关系、工作流的业务边界,基于此重新设计它们的数据结构 >> > >> > ## 2.设计思路 >> > >> > ### 2.1 工作流、关系、作业 >> > >> > 首先,我们抛开当前的实现,明确任务(后续描述更改为作业)、关系、工作流的业务边界,如何去解耦 >> > >> > - 作业:调度系统要执行的任务,作业内只包含执行作业所需要的数据和资源 >> > - >> > >> 关系:作业与作业之间的关系以及执行条件,包含执行关系(A完成后,执行B)和执行条件(A完成并成功后,执行B;A完成并失败后,执行C;A完成30分钟后,执行D) >> > - 工作流:一组关系的载体,工作流只保存作业间的关系(DAG是工作流的一种展示形式,创建关系的一种方式) >> > >> > 结合当前DS支持的功能,我们可以做一个分类 >> > >> > - 作业:依赖检查、子流程、Shell、存储过程、Sql、Spark、Flink、MR、Python、Http、DataX、Sqoop >> > - 关系:串行执行、并行执行、聚合执行、条件分支、延迟执行 >> > - 工作流:调度执行的边界,包含一组关系 >> > >> > #### 2.1.1 进一步细化 >> > >> > 作业定义数据,和当前的作业定义数据差别不大,都是由公共字段和自定义字段组成,只需要去掉关系相关的字段就可以了。 >> > >> > 工作流定义数据,和当前的工作流定义数据差别也不大,去掉json字段就可以了。 >> > >> > >> > >> 关系数据,我们根据分类可以抽象为两个节点和一个路径。节点就是作业,路径包含前置节点到后置节点需要满足的条件规则是什么,条件规则包含:无条件、判断条件、延迟条件。 >> > >> > ### 2.2 版本管理 >> > >> > >> > >> 我们明确业务边界,解耦后它们之间就变成了引用关系,工作流和关系之间是一对多,关系和作业之间是一对多。不仅是定义数据,我们还要考虑实例数据,每次工作流的调度执行都会产生工作流实例,作业和工作流都是可以变更的,而工作流实例又要支持查看、重跑、恢复失败等。这就需要引入定义数据的版本管理了。每一次工作流、关系、作业变更都需要保存旧版本数据,生成新版本数据。 >> > >> > 所以这里的设计思路是: >> > >> > 定义数据需要增加版本字段 >> > >> > 定义表需要增加对应的日志表 >> > >> > 创建定义数据时,双写到定义表和日志表,修改定义数据时,保存修改后的版本到日志表 >> > >> > 定义表的引用数据中不需要保存版本信息(引用最新版本),实例数据中保存执行时的版本信息 >> > >> > ### 2.3 编码设计 >> > >> > >> 这里还涉及到工作流、作业定义数据导入导出问题,根据之前社区讨论的方案,需要引入编码方案,工作流、关系、作业每条数据都会有一个唯一编码,相关Issue: >> > https://github.com/apache/incubator-dolphinscheduler/issues/3820 >> > >> > 资源:RESOURCE_xxx >> > >> > 作业:TASK_xxx >> > >> > 关系:RELATION_xxx >> > >> > 工作流:PROCESS_xxx >> > >> > 项目:PROJECT_xxx >> > >> > ## 3.设计方案 >> > >> > ### 3.1 表模型设计 >> > >> > #### 3.1.1 作业定义表:t_ds_task_definithon >> > >> > | 列名 | 描述 | >> > | ----------------------- | -------------- | >> > | id | 自增ID | >> > | union_code | 唯一编码 | >> > | version | 版本 | >> > | name | 作业名称 | >> > | description | 描述 | >> > | task_type | 作业类型 | >> > | task_params | 作业自定义参数 | >> > | run_flag | 运行标志 | >> > | task_priority | 作业优先级 | >> > | worker_group | worker分组 | >> > | fail_retry_times | 失败重试次数 | >> > | fail_retry_interval | 失败重试间隔 | >> > | timeout_flag | 超时标志 | >> > | timeout_notify_strategy | 超时通知策略 | >> > | timeout_duration | 超时时长 | >> > | create_time | 创建时间 | >> > | update_time | 修改时间 | >> > >> > #### 3.1.2 作业关系表:t_ds_task_relation >> > >> > | 列名 | 描述 | >> > | ----------------------- | -------------------------------------- | >> > | id | 自增ID | >> > | union_code | 唯一编码 | >> > | version | 版本 | >> > | process_definition_code | 工作流编码 | >> > | node_code | 节点编码(工作流编码/作业编码) | >> > | post_node_code | 后置节点编码(工作流编码/作业编码) | >> > | condition_type | 条件类型 0:无 1:判断条件 2:延迟条件 | >> > | condition_params | 条件参数 | >> > | create_time | 创建时间 | >> > | update_time | 修改时间 | >> > >> > #### 3.1.3 工作流定义表:t_ds_process_definithon >> > >> > | 列名 | 描述 | >> > | ---- | ---- | >> > | id | 自增ID | >> > | union_code | 唯一编码 | >> > | version | 版本 | >> > | name | 工作流名称 | >> > | project_code | 项目编码 | >> > | release_state | 发布状态 | >> > | user_id | 所属用户ID | >> > | description | 描述 | >> > | global_params | 全局参数 | >> > | flag | 流程是否可用:0 不可用,1 可用 | >> > | receivers | 收件人 | >> > | receivers_cc | 抄送人 | >> > | timeout | 超时时间 | >> > | tenant_id | 租户ID | >> > | create_time | 创建时间 | >> > | update_time | 修改时间 | >> > >> > #### 3.1.4 作业定义日志表:t_ds_task_definithon_log >> > >> > 作业定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 >> > >> > #### 3.1.5 作业关系日志表:t_ds_task_relation_log >> > >> > 作业关系表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 >> > >> > #### 3.1.6 工作流定义日志表:t_ds_process_definithon_log >> > >> > 工作流定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 >> > >> > ### 3.2 前端 >> > >> > *这里的设计只是个人想法,交互上还需要前端帮助设计下* >> > >> > 需要增加作业管理相关功能,包括:作业列表,作业的创建、更新、删除、查看详情操作 >> > >> > 创建工作流页面,需要将json拆分为工作流定义数据、作业关系数据传给后端API层保存/更新 >> > >> > 工作流页面,拖拽任务节点时,增加引用作业选项 >> > >> > 条件分支节点、延迟节点需要解析为关系中的条件规则数据;反之,查询工作流时需要将后端返回的条件规则数据展示为对应的节点 >> > >> > ### 3.3 Master >> > >> > >> > >> Master调度工作流时,需要将<从json构建dag>修改为<从关系数据构建dag>,执行一个工作流时先全量加载关系数据(这里不加载作业数据),生成DAG,遍历DAG执行时,再获取需要执行的作业数据 >> > >> > 其他执行流程和现有流程一致 >> > >> > -------------------- >> > DolphinScheduler(Incubator) Commtter >> > Hemin Wen 温合民 >> > [email protected] >> > -------------------- >> > >> > > > -- > DolphinScheduler(Incubator) PPMC > BaoLiang 鲍亮 > [email protected] > -- DolphinScheduler(Incubator) PPMC BaoLiang 鲍亮 [email protected]
