There are two solutions for the query timing of job details: - The Master is only responsible for DAG scheduling. When a job is executed, it sends the job code to the Worker, and the Worker is responsible for querying the job details and executing the job - The Master executes DAG scheduling, queries the job details when a job is executed, and then sends it to the Worker to execute the job
--------------------------------------------------------------------------------------------------------- 针对作业详情数据的查询时点,有两种方案: - Master只负责DAG调度,执行到某个作业时,发送作业编码到Worker,Worker负责查询作业详情并执行作业 - Master执行DAG调度,执行到某个作业时查询作业详情,然后发送给Worker执行作业 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] -------------------- Hemin Wen <[email protected]> 于2020年11月25日周三 上午10:01写道: > Hi! > > About json splitting of workflow definition, The following is the design > plan for splitting three tables. > > Everyone can discuss together. > > > -------------------------------------------------------------------------------------------------------------- > > ## 1. Currently > The workflow definition of the current DS system includes task definition > data and task relationship data. In the design of the database, task data > and task relationship data are stored in the workflow as a string type > field (process_definition_json) Definition table (t_ds_process_definition). > > With the increase of workflow and tasks, the following problems will arise: > > -Task data, relational data and workflow data are coupled together, which > is not friendly to the scenario of single-task scheduling. The task must be > created in the workflow > > -The task cannot be reused because the task is created in the workflow > > -The maintenance cost is high. If you move the whole body and modify any > task, you need to update the data in the workflow as a whole, and it also > increases the log cost > > -When there are many tasks in the workflow, the efficiency of global > search and statistical analysis is low, such as querying which tasks use > which data source > > -Poor scalability, for example, the realization of blood relationship > function in the future will only lead to more and more bloated workflow > definitions > > -Tasks, relationships, and workflow boundaries are blurred. Condition > nodes and delay nodes are also regarded as a task, which is actually a > combination of relationships and conditions > > Based on the above pain points, we need to redefine the business > boundaries of tasks, relationships, and workflows, and redesign their data > structures based on this > > ## 2. Design Ideas > > ### 2.1 Workflow, relation, job > > First of all, we set aside the current implementation and clarify the > business boundaries of tasks (the subsequent description is changed to > jobs), relationships, and workflows, and how to decouple > > -Job: the task that the scheduling system really needs to execute, the job > only contains the data and resources needed to execute the job > -relation: the relationship between the job and the job and the execution > conditions, including the execution relationship (after A completes, > execute B) and execution conditions (after A completes and succeeds, > execute B; after A completes and fails, execute C; A completes 30 After > minutes, execute D) > -Workflow: the carrier of a set of relationships, the workflow only saves > the relationships between jobs (DAG is a display form of workflow, a way to > create relationships) > > Combined with the functions supported by the current DS, we can make a > classification > > -Job: Dependency check, sub-process, Shell, stored procedure, Sql, Spark, > Flink, MR, Python, Http, DataX, Sqoop > -Relationship: serial execution, parallel execution, aggregate execution, > conditional branch, delayed execution > -Workflow: the boundary of scheduling execution, including a set of > relationships > > #### 2.1.1 Further refinement > > The job definition data is not much different from the current job > definition data. Both are composed of public fields and custom fields. You > only need to remove the fields related to the relationship. > > The workflow definition data is not much different from the current > workflow definition data, just remove the json field. > > Relational data, we can abstract into two nodes and one path according to > classification. The node is the job, and the path includes the conditional > rules that need to be met from the pre-node to the post-node. The > conditional rules include: unconditional, judgment condition, and delay > condition. > > ### 2.2 Version Management > > We clarify the business boundaries. After decoupling, they become a > reference relationship. The workflow and the relationship are one-to-many, > and the relationship and the job are one-to-many. Not only is the > definition of data, we also need to consider instance data. Every time a > workflow is scheduled and executed, a workflow instance will be generated. > Jobs and workflows can be changed, and the workflow instance must support > viewing, rerun, recovery failure, etc. . This requires the introduction of > version management of the definition data. Every time workflow, > relationship, and job changes need to save old version data and generate > new version data. > > So the design idea here is: > > To define data, you need to add a version field > > The definition table needs to add the corresponding log table > > When creating definition data, double write to the definition table and > log table. When modifying the definition data, save the modified version to > the log table > > There is no need to save version information in the reference data of the > definition table (refer to the latest version), and the version information > at the time of execution is saved in the instance data > > ### 2.3 Coding Design > > This also involves the import and export of workflow and job definition > data. According to the previous community discussion, a coding scheme needs > to be introduced. Each piece of data in workflow, relationship, and job > will have a unique code. Related Issues: https://github > .com/apache/incubator-dolphinscheduler/issues/3820 > > Resource: RESOURCE_xxx > > Task: TASK_xxx > > Relation: RELATION_xxx > > Workflow: PROCESS_xxx > > Project: PROJECT_xxx > > ## 3. Design plan > > ### 3.1 Table model design > > #### 3.1.1 Job definition table: t_ds_task_definithon > > | Column Name | Description | > | ----------------------- | -------------- | > | id | Self-incrementing ID | > | union_code | unique code | > | version | Version | > | name | Job name | > | description | description | > | task_type | Job type | > | task_params | Job custom parameters | > | run_flag | Run flag | > | task_priority | Job priority | > | worker_group | worker group | > | fail_retry_times | Number of failed retries | > | fail_retry_interval | Failure retry interval | > | timeout_flag | Timeout flag | > | timeout_notify_strategy | Timeout notification strategy | > | timeout_duration | Timeout duration | > | create_time | Creation time | > | update_time | Modification time | > > #### 3.1.2 Task relation table: t_ds_task_relation > > | Column Name | Description | > | ----------------------- | ------------------------- ------------- | > | id | Self-incrementing ID | > | union_code | unique code | > | version | Version | > | process_definition_code | Workflow coding | > | node_code | Node code (workflow code/job code) | > | post_node_code | Post node code (workflow code/job code) | > | condition_type | Condition type 0: None 1: Judgment condition 2: Delay > condition | > | condition_params | Condition parameters | > | create_time | Creation time | > | update_time | Modification time | > > #### 3.1.3 Workflow definition table: t_ds_process_definithon > > | Column Name | Description | > | ---- | ---- | > | id | Self-incrementing ID | > | union_code | unique code | > | version | Version | > | name | Workflow name | > | project_code | Project code | > | release_state | Release state | > | user_id | Owning user ID | > | description | description | > | global_params | Global parameters | > | flag | Whether the process is available: 0 is not available, 1 is > available | > | receivers | recipients | > | receivers_cc | CC | > | timeout | Timeout time | > | tenant_id | tenant ID | > | create_time | Creation time | > | update_time | Modification time | > > #### 3.1.4 Job definition log table: t_ds_task_definithon_log > > Add operation type (add, modify, delete), operator, and operation time > based on the job definition table > > #### 3.1.5 Job relation log table: t_ds_task_relation_log > > Add operation type (add, modify, delete), operator, and operation time > based on the job relationship table > > #### 3.1.6 Workflow definition log table: t_ds_process_definithon_log > > Add operation type (add, modify, delete), operator, and operation time > based on the workflow definition table > > ### 3.2 Frontend > > *The design here is just a personal idea, and the front-end help is needed > to design the interaction* > > Need to add job management related functions, including: job list, job > creation, update, delete, view details operations > > To create a workflow page, you need to split json into workflow definition > data and job relationship data to the back-end API layer to save/update > > Workflow page, when dragging task nodes, add reference job options > > The conditional branch nodes and delay nodes need to be resolved into the > conditional rule data in the relationship; conversely, the conditional rule > data returned by the backend needs to be displayed as the corresponding > node when querying the workflow > > ### 3.3 Master > > When the Master schedules the workflow, you need to modify <Build dag from > json> to <Build dag from relational data>. When executing a workflow, first > load the relational data in full (no job data is loaded here), generate > DAG, and traverse DAG execution , And then get the job data that needs to > be executed > > Other execution processes are consistent with existing processes > > > -------------------------------------------------------------------------------------------------------------- > > ## 1.现状 > > 当前DS系统的工作流定义包含了任务定义数据和任务之间关系数据,并且在数据库的设计上,任务数据和任务关系数据是以一个字符串类型字段(process_definition_json)的方式,保存在工作流定义表(t_ds_process_definition)中。 > > 随着工作流和任务的增加,会产生如下问题: > > - 任务数据、关系数据和工作流数据耦合在一起,对单任务调度的场景不友好,任务必须创建在工作流内 > > - 任务无法复用,因为任务是创建在工作流内的 > > - 维护成本高,牵一发动全身,修改任何一个任务,都需要整体更新工作流内数据,同时也增加了日志成本 > > - 工作流内任务较多时,全局搜索和统计分析效率低,例如查询哪些任务用到了哪个数据源 > > - 扩展性差,例如未来要实现血缘功能,只会导致工作流定义越来越臃肿 > > - 任务、关系、工作流边界模糊,条件节点、延迟节点也被当做一种任务,实际是关系与条件的组合 > > 基于以上痛点,我们需要重新定义任务、关系、工作流的业务边界,基于此重新设计它们的数据结构 > > ## 2.设计思路 > > ### 2.1 工作流、关系、作业 > > 首先,我们抛开当前的实现,明确任务(后续描述更改为作业)、关系、工作流的业务边界,如何去解耦 > > - 作业:调度系统要执行的任务,作业内只包含执行作业所需要的数据和资源 > - > 关系:作业与作业之间的关系以及执行条件,包含执行关系(A完成后,执行B)和执行条件(A完成并成功后,执行B;A完成并失败后,执行C;A完成30分钟后,执行D) > - 工作流:一组关系的载体,工作流只保存作业间的关系(DAG是工作流的一种展示形式,创建关系的一种方式) > > 结合当前DS支持的功能,我们可以做一个分类 > > - 作业:依赖检查、子流程、Shell、存储过程、Sql、Spark、Flink、MR、Python、Http、DataX、Sqoop > - 关系:串行执行、并行执行、聚合执行、条件分支、延迟执行 > - 工作流:调度执行的边界,包含一组关系 > > #### 2.1.1 进一步细化 > > 作业定义数据,和当前的作业定义数据差别不大,都是由公共字段和自定义字段组成,只需要去掉关系相关的字段就可以了。 > > 工作流定义数据,和当前的工作流定义数据差别也不大,去掉json字段就可以了。 > > > 关系数据,我们根据分类可以抽象为两个节点和一个路径。节点就是作业,路径包含前置节点到后置节点需要满足的条件规则是什么,条件规则包含:无条件、判断条件、延迟条件。 > > ### 2.2 版本管理 > > > 我们明确业务边界,解耦后它们之间就变成了引用关系,工作流和关系之间是一对多,关系和作业之间是一对多。不仅是定义数据,我们还要考虑实例数据,每次工作流的调度执行都会产生工作流实例,作业和工作流都是可以变更的,而工作流实例又要支持查看、重跑、恢复失败等。这就需要引入定义数据的版本管理了。每一次工作流、关系、作业变更都需要保存旧版本数据,生成新版本数据。 > > 所以这里的设计思路是: > > 定义数据需要增加版本字段 > > 定义表需要增加对应的日志表 > > 创建定义数据时,双写到定义表和日志表,修改定义数据时,保存修改后的版本到日志表 > > 定义表的引用数据中不需要保存版本信息(引用最新版本),实例数据中保存执行时的版本信息 > > ### 2.3 编码设计 > > 这里还涉及到工作流、作业定义数据导入导出问题,根据之前社区讨论的方案,需要引入编码方案,工作流、关系、作业每条数据都会有一个唯一编码,相关Issue: > https://github.com/apache/incubator-dolphinscheduler/issues/3820 > > 资源:RESOURCE_xxx > > 作业:TASK_xxx > > 关系:RELATION_xxx > > 工作流:PROCESS_xxx > > 项目:PROJECT_xxx > > ## 3.设计方案 > > ### 3.1 表模型设计 > > #### 3.1.1 作业定义表:t_ds_task_definithon > > | 列名 | 描述 | > | ----------------------- | -------------- | > | id | 自增ID | > | union_code | 唯一编码 | > | version | 版本 | > | name | 作业名称 | > | description | 描述 | > | task_type | 作业类型 | > | task_params | 作业自定义参数 | > | run_flag | 运行标志 | > | task_priority | 作业优先级 | > | worker_group | worker分组 | > | fail_retry_times | 失败重试次数 | > | fail_retry_interval | 失败重试间隔 | > | timeout_flag | 超时标志 | > | timeout_notify_strategy | 超时通知策略 | > | timeout_duration | 超时时长 | > | create_time | 创建时间 | > | update_time | 修改时间 | > > #### 3.1.2 作业关系表:t_ds_task_relation > > | 列名 | 描述 | > | ----------------------- | -------------------------------------- | > | id | 自增ID | > | union_code | 唯一编码 | > | version | 版本 | > | process_definition_code | 工作流编码 | > | node_code | 节点编码(工作流编码/作业编码) | > | post_node_code | 后置节点编码(工作流编码/作业编码) | > | condition_type | 条件类型 0:无 1:判断条件 2:延迟条件 | > | condition_params | 条件参数 | > | create_time | 创建时间 | > | update_time | 修改时间 | > > #### 3.1.3 工作流定义表:t_ds_process_definithon > > | 列名 | 描述 | > | ---- | ---- | > | id | 自增ID | > | union_code | 唯一编码 | > | version | 版本 | > | name | 工作流名称 | > | project_code | 项目编码 | > | release_state | 发布状态 | > | user_id | 所属用户ID | > | description | 描述 | > | global_params | 全局参数 | > | flag | 流程是否可用:0 不可用,1 可用 | > | receivers | 收件人 | > | receivers_cc | 抄送人 | > | timeout | 超时时间 | > | tenant_id | 租户ID | > | create_time | 创建时间 | > | update_time | 修改时间 | > > #### 3.1.4 作业定义日志表:t_ds_task_definithon_log > > 作业定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 > > #### 3.1.5 作业关系日志表:t_ds_task_relation_log > > 作业关系表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 > > #### 3.1.6 工作流定义日志表:t_ds_process_definithon_log > > 工作流定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 > > ### 3.2 前端 > > *这里的设计只是个人想法,交互上还需要前端帮助设计下* > > 需要增加作业管理相关功能,包括:作业列表,作业的创建、更新、删除、查看详情操作 > > 创建工作流页面,需要将json拆分为工作流定义数据、作业关系数据传给后端API层保存/更新 > > 工作流页面,拖拽任务节点时,增加引用作业选项 > > 条件分支节点、延迟节点需要解析为关系中的条件规则数据;反之,查询工作流时需要将后端返回的条件规则数据展示为对应的节点 > > ### 3.3 Master > > > Master调度工作流时,需要将<从json构建dag>修改为<从关系数据构建dag>,执行一个工作流时先全量加载关系数据(这里不加载作业数据),生成DAG,遍历DAG执行时,再获取需要执行的作业数据 > > 其他执行流程和现有流程一致 > > -------------------- > DolphinScheduler(Incubator) Commtter > Hemin Wen 温合民 > [email protected] > -------------------- >
