hi:
Next, I think we can analyze the impact, split the task and evaluate the workload ——————————————— 接下来,我觉得可以分析一下影响,拆分任务&评估工作量了.. -------------------------------------- BoYi ZhangE-mail : [email protected] On 11/30/2020 11:33,Hemin Wen<[email protected]> wrote: 1. I think you'd better talk to the front end. If you implement it in the way you say, it's a bit more difficult for the front end The generation of conditional nodes and delayed nodes can be done on the back end, and the front end does not need to be processed. 2. I think there is no problem keeping ‘locations' field. — 1.条件节点和延迟节点可以后端来生成对应的数据结构,前端就不需要处理了 2.我认为保留 ‘locations’ 字段没有问题 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] -------------------- boyi <[email protected]> 于2020年11月30日周一 上午11:20写道: hi: According to the classification of nodes in the design plan, only dependent nodes and condition nodes belong to the relationship, which is stored in the condition_type and condition_params fields. The UI display can generate dependent nodes and condition nodes based on the values of these two fields in the relationship table, so UI is consistent with the current I think you'd better talk to the front end. If you implement it in the way you say, it's a bit more difficult for the front end The current workflow in the front-end display supports automatic formatting of node coordinates, so my idea is that there is no need to store the location field. This depends on the front-end interaction design, and it can be saved if necessary. The current version will record the connection relationship and the coordinate point of the task. Refer to the'locations' and'connects' fields in the't_ds_process_definition' table. Allows users to define the order according to their needs. In complex situations, automatic formatting still needs more optimization. ———————— 根据设计方案中对节点的分类,只有依赖节点和条件节点数属于关系的,也就是保存在condition_type和condition_params字段中, UI展示可以根据关系表中这两个字段的值生成依赖节点和条件节点,所以UI和当前是一致的 我觉得你最好跟前端聊一下,如果按照你说的方式来实现,对前端难度有点多. 而且一般前端不会考虑太多的业务上的逻辑关系. 当前工作流在前端展示是支持自动格式化节点坐标的,所以我的想法是不需要存储location字段的, 这里要看前端交互设计,如果需要也可以保存 目前的版本会记录连线关系和任务所在的坐标点 . 参考't_ds_process_definition'表中的'locations'和'connects'字段. 使用户可以根据需要自己定义顺序. 在复杂情况下,自动格式化还有需要做更多的优化. -------------------------------------- BoYi ZhangE-mail : [email protected] On 11/30/2020 10:18,Hemin Wen<[email protected]> wrote: sorry, The description just now is wrong. Only conditional nodes and delayed nodes belong to relationships, and dependent nodes belong to tasks. — 不好意思,刚才的描述有误,只有条件节点、延迟节点是属于关系的,依赖节点属于任务 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] -------------------- Hemin Wen <[email protected]> 于2020年11月30日周一 上午9:56写道: According to the classification of nodes in the design plan, only dependent nodes and condition nodes belong to the relationship, which is stored in the condition_type and condition_params fields. The UI display can generate dependent nodes and condition nodes based on the values of these two fields in the relationship table, so UI is consistent with the current The current workflow in the front-end display supports automatic formatting of node coordinates, so my idea is that there is no need to store the location field. This depends on the front-end interaction design, and it can be saved if necessary. --- 根据设计方案中对节点的分类,只有依赖节点和条件节点数属于关系的,也就是保存在condition_type和condition_params字段中, UI展示可以根据关系表中这两个字段的值生成依赖节点和条件节点,所以UI和当前是一致的 当前工作流在前端展示是支持自动格式化节点坐标的,所以我的想法是不需要存储location字段的, 这里要看前端交互设计,如果需要也可以保存 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] -------------------- boyi <[email protected]> 于2020年11月28日周六 下午10:21写道: hi: re : 2.The front-end UI is the same as it is now, and the connection does not require logical operation The t_ds_task_relation table not only identifies the connection relationship, but also has business implications, such as” condition_params" How to display this in the front UI In addition, there is no field in the task table “ t_ds_task_definithon", which is used to store the coordinate information of the front-end DAG diagram. It is used to describe the position of the task in the DAG canvas —————— re : 2.前端UI和现在一样,连线是不需要逻辑操作的,只是做UI和数据的映射就行了 t_ds_task_relation表不仅标识连线关系,还具备业务含义,比如condition_params字段. 这个在前端ui 是如何展示. 另外, 在任务表t_ds_task_definithon还缺少”location"字段,用于存放前端DAG图的坐标信息. 用于描述该任务在dag画布中的位置. -------------------------------------- BoYi ZhangE-mail : [email protected] --------- Forwarded Message --------- From: Hemin Wen <[email protected]> Date: 11/28/2020 20:32 To: dev <[email protected]> Subject: Re: [DISCUSS] Process definithon json split design 1.En, this point, I really didn't think of that. I don't know why it was designed like this before, I think the purpose of editing a workflow instance is to update the workflow definition. If so, Modifying a workflow instance is the same as modifying a workflow definition. 2.The front-end UI is the same as it is now, and the connection does not require logical operation 3.Yes, This is the current design. --- 1.嗯,这一点,我确实没有想到 我不知道之前为什么这样设计,我认为编辑工作流实例的目的就是为了更新工作流定义,我们内部一直都是这样用的,没有仅更新工作流实例的操作(这也会导致实例和定义分叉) 所以,如果这样的话,更新工作流实例和更新工作流定义是一样的处理方式,或者关闭更新工作流实例的口子 2.前端UI和现在一样,连线是不需要逻辑操作的,只是做UI和数据的映射就行了 3.是的,当前就是这样设计的 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] -------------------- boyi <[email protected]> 于2020年11月28日周六 下午3:36写道: hi: Three questions need to be confirmed: 1. If there is a field in the workflow instance to record the version log relationship, if the instance is edited, a new data will be added to the version log table directly?? 2. I think the task execution logic is saved in the connection, which needs the support of the front-end UI. The connection of the current version does not have the function of logical operation. It needs to confirm with the front-end whether it can be implemented 3. Whether the task details can be obtained by the worker, which needs to change the interaction logic between the master and the worker. The underlying communication mechanism needs to be moved. At present, the master is being reconstructed. If it is possible, it is better to obtain the task details query through the worker. This can reduce the pressure of master communication --- 有三个问题需要确认一下: 1. 工作流实例里面假设有一个字段记录版本日志关系的话, 如果编辑这个实例了,是直接在版本日志表中新增一条数据?? 2. 我认为在连线里面保存了任务执行逻辑,这个需要前端UI的支持,目前版本的连线是不具备逻辑操作的功能. 这个需要跟前端确认一下能否实现 3.任务详情的获取能否交由worker进行数据库, 这需要更改master和worker之间的交互逻辑.需要动底层的通讯机制. 目前master正在重构,如果可以的话,最好做到把任务详情的查询通过worker进行获取. 这样就可以减轻master通讯的压力. -------------------------------------- BoYi ZhangE-mail : [email protected] On 11/27/2020 17:44,Hemin Wen<[email protected]> wrote: Plan: Master is only responsible for DAG scheduling. When a job is executed, it sends job code to worker, who is responsible for querying job details and executing job Master only saves workflow data encoding and version and relational data encoding and version when scheduling workflow Worker saves the code and version of the job during execution Instance data does not need to store detailed information. There are version log tables of workflow, relationship, and job in the design plan, and version log tables can be associated when viewing. Plan: Master performs DAG scheduling, queries job details when a job is executed, and then sends it to worker to execute job The reason for not one-time query here is that when there are too many jobs, one-time loading will consume more memory ------------------------------------------------------------------------ ------------------------------------------------ 方案:Master只负责DAG调度,执行到某个作业时,发送作业编码到Worker,Worker负责查询作业详情并执行作业 Master只在调度工作流的时候保存工作流数据编码、版本和关系数据编码、版本 Worker在执行时保存作业的编码、版本 实例数据不需要存储详细信息,设计方案中有工作流、关系、作业的版本日志表,查看时可以关联版本日志表 方案:Master执行DAG调度,执行到某个作业时查询作业详情,然后发送给Worker执行作业 这里不一次性查询的原因是,作业非常多时,一次性加载会有一定的内存消耗 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] -------------------- boyi <[email protected]> 于2020年11月27日周五 下午5:04写道: hi: Both of them have defects : -Master is only responsible for DAG scheduling. When a job is executed, it sends job code to worker, who is responsible for querying job details and executing job When the master saves the workflow instance, it needs to save the snapshot information of the current workflow This still needs to join a table query or two-step query The master will still query the task details When the worker executes, it will still query the task details It is unreasonable to query master and worker twice -Master performs DAG scheduling, queries job details when a job is executed, and then sends it to worker to execute job Master is used to query the task details. Why not get all the workflow definitions at one time For example, if there are 1000 tasks in a workflow, you should query 1000 tasks at a time instead of querying the database 1000 times However, the performance of the database will be consumed due to the splitting into three tables Test report support is required, such as the impact on the database and the delay of tasks ————————————————————— - Master只负责DAG调度,执行到某个作业时,发送作业编码到Worker,Worker负责查询作业详情并执行作业 master在保存工作流实例的时候,需要保存当前工作流的快照信息. 这个还是需要联表查询或者分成两步查询. master 依旧会查询任务详情. worker执行的时候还是会查询任务详情. master 和worker 查询两遍这是不合理的. - Master执行DAG调度,执行到某个作业时查询作业详情,然后发送给Worker执行作业 统一用master查询任务详情,为什么不一次性把所有的工作流定义全部获取,而是分开查. 比如一个工作流里面有1000个任务, 应该是一次性把1000个任务查询出来,而不是查询数据库1000次. 但是无论怎样,因为拆分成三个表,对数据库的性能都存在消耗 都需要测试报告的支持.比如对数据库的影响和任务的延迟有多大影响. -------------------------------------- BoYi ZhangE-mail : [email protected] On 11/27/2020 15:53,Hemin Wen<[email protected]> wrote: There are two solutions for the query timing of job details: - The Master is only responsible for DAG scheduling. When a job is executed, it sends the job code to the Worker, and the Worker is responsible for querying the job details and executing the job - The Master executes DAG scheduling, queries the job details when a job is executed, and then sends it to the Worker to execute the job --------------------------------------------------------------------------------------------------------- 针对作业详情数据的查询时点,有两种方案: - Master只负责DAG调度,执行到某个作业时,发送作业编码到Worker,Worker负责查询作业详情并执行作业 - Master执行DAG调度,执行到某个作业时查询作业详情,然后发送给Worker执行作业 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] -------------------- Hemin Wen <[email protected]> 于2020年11月25日周三 上午10:01写道: Hi! About json splitting of workflow definition, The following is the design plan for splitting three tables. Everyone can discuss together. -------------------------------------------------------------------------------------------------------------- ## 1. Currently The workflow definition of the current DS system includes task definition data and task relationship data. In the design of the database, task data and task relationship data are stored in the workflow as a string type field (process_definition_json) Definition table (t_ds_process_definition). With the increase of workflow and tasks, the following problems will arise: -Task data, relational data and workflow data are coupled together, which is not friendly to the scenario of single-task scheduling. The task must be created in the workflow -The task cannot be reused because the task is created in the workflow -The maintenance cost is high. If you move the whole body and modify any task, you need to update the data in the workflow as a whole, and it also increases the log cost -When there are many tasks in the workflow, the efficiency of global search and statistical analysis is low, such as querying which tasks use which data source -Poor scalability, for example, the realization of blood relationship function in the future will only lead to more and more bloated workflow definitions -Tasks, relationships, and workflow boundaries are blurred. Condition nodes and delay nodes are also regarded as a task, which is actually a combination of relationships and conditions Based on the above pain points, we need to redefine the business boundaries of tasks, relationships, and workflows, and redesign their data structures based on this ## 2. Design Ideas ### 2.1 Workflow, relation, job First of all, we set aside the current implementation and clarify the business boundaries of tasks (the subsequent description is changed to jobs), relationships, and workflows, and how to decouple -Job: the task that the scheduling system really needs to execute, the job only contains the data and resources needed to execute the job -relation: the relationship between the job and the job and the execution conditions, including the execution relationship (after A completes, execute B) and execution conditions (after A completes and succeeds, execute B; after A completes and fails, execute C; A completes 30 After minutes, execute D) -Workflow: the carrier of a set of relationships, the workflow only saves the relationships between jobs (DAG is a display form of workflow, a way to create relationships) Combined with the functions supported by the current DS, we can make a classification -Job: Dependency check, sub-process, Shell, stored procedure, Sql, Spark, Flink, MR, Python, Http, DataX, Sqoop -Relationship: serial execution, parallel execution, aggregate execution, conditional branch, delayed execution -Workflow: the boundary of scheduling execution, including a set of relationships #### 2.1.1 Further refinement The job definition data is not much different from the current job definition data. Both are composed of public fields and custom fields. You only need to remove the fields related to the relationship. The workflow definition data is not much different from the current workflow definition data, just remove the json field. Relational data, we can abstract into two nodes and one path according to classification. The node is the job, and the path includes the conditional rules that need to be met from the pre-node to the post-node. The conditional rules include: unconditional, judgment condition, and delay condition. ### 2.2 Version Management We clarify the business boundaries. After decoupling, they become a reference relationship. The workflow and the relationship are one-to-many, and the relationship and the job are one-to-many. Not only is the definition of data, we also need to consider instance data. Every time a workflow is scheduled and executed, a workflow instance will be generated. Jobs and workflows can be changed, and the workflow instance must support viewing, rerun, recovery failure, etc. . This requires the introduction of version management of the definition data. Every time workflow, relationship, and job changes need to save old version data and generate new version data. So the design idea here is: To define data, you need to add a version field The definition table needs to add the corresponding log table When creating definition data, double write to the definition table and log table. When modifying the definition data, save the modified version to the log table There is no need to save version information in the reference data of the definition table (refer to the latest version), and the version information at the time of execution is saved in the instance data ### 2.3 Coding Design This also involves the import and export of workflow and job definition data. According to the previous community discussion, a coding scheme needs to be introduced. Each piece of data in workflow, relationship, and job will have a unique code. Related Issues: https://github .com/apache/incubator-dolphinscheduler/issues/3820 Resource: RESOURCE_xxx Task: TASK_xxx Relation: RELATION_xxx Workflow: PROCESS_xxx Project: PROJECT_xxx ## 3. Design plan ### 3.1 Table model design #### 3.1.1 Job definition table: t_ds_task_definithon | Column Name | Description | | ----------------------- | -------------- | | id | Self-incrementing ID | | union_code | unique code | | version | Version | | name | Job name | | description | description | | task_type | Job type | | task_params | Job custom parameters | | run_flag | Run flag | | task_priority | Job priority | | worker_group | worker group | | fail_retry_times | Number of failed retries | | fail_retry_interval | Failure retry interval | | timeout_flag | Timeout flag | | timeout_notify_strategy | Timeout notification strategy | | timeout_duration | Timeout duration | | create_time | Creation time | | update_time | Modification time | #### 3.1.2 Task relation table: t_ds_task_relation | Column Name | Description | | ----------------------- | ------------------------- ------------- | | id | Self-incrementing ID | | union_code | unique code | | version | Version | | process_definition_code | Workflow coding | | node_code | Node code (workflow code/job code) | | post_node_code | Post node code (workflow code/job code) | | condition_type | Condition type 0: None 1: Judgment condition 2: Delay condition | | condition_params | Condition parameters | | create_time | Creation time | | update_time | Modification time | #### 3.1.3 Workflow definition table: t_ds_process_definithon | Column Name | Description | | ---- | ---- | | id | Self-incrementing ID | | union_code | unique code | | version | Version | | name | Workflow name | | project_code | Project code | | release_state | Release state | | user_id | Owning user ID | | description | description | | global_params | Global parameters | | flag | Whether the process is available: 0 is not available, 1 is available | | receivers | recipients | | receivers_cc | CC | | timeout | Timeout time | | tenant_id | tenant ID | | create_time | Creation time | | update_time | Modification time | #### 3.1.4 Job definition log table: t_ds_task_definithon_log Add operation type (add, modify, delete), operator, and operation time based on the job definition table #### 3.1.5 Job relation log table: t_ds_task_relation_log Add operation type (add, modify, delete), operator, and operation time based on the job relationship table #### 3.1.6 Workflow definition log table: t_ds_process_definithon_log Add operation type (add, modify, delete), operator, and operation time based on the workflow definition table ### 3.2 Frontend *The design here is just a personal idea, and the front-end help is needed to design the interaction* Need to add job management related functions, including: job list, job creation, update, delete, view details operations To create a workflow page, you need to split json into workflow definition data and job relationship data to the back-end API layer to save/update Workflow page, when dragging task nodes, add reference job options The conditional branch nodes and delay nodes need to be resolved into the conditional rule data in the relationship; conversely, the conditional rule data returned by the backend needs to be displayed as the corresponding node when querying the workflow ### 3.3 Master When the Master schedules the workflow, you need to modify <Build dag from json> to <Build dag from relational data>. When executing a workflow, first load the relational data in full (no job data is loaded here), generate DAG, and traverse DAG execution , And then get the job data that needs to be executed Other execution processes are consistent with existing processes -------------------------------------------------------------------------------------------------------------- ## 1.现状 当前DS系统的工作流定义包含了任务定义数据和任务之间关系数据,并且在数据库的设计上,任务数据和任务关系数据是以一个字符串类型字段(process_definition_json)的方式,保存在工作流定义表(t_ds_process_definition)中。 随着工作流和任务的增加,会产生如下问题: - 任务数据、关系数据和工作流数据耦合在一起,对单任务调度的场景不友好,任务必须创建在工作流内 - 任务无法复用,因为任务是创建在工作流内的 - 维护成本高,牵一发动全身,修改任何一个任务,都需要整体更新工作流内数据,同时也增加了日志成本 - 工作流内任务较多时,全局搜索和统计分析效率低,例如查询哪些任务用到了哪个数据源 - 扩展性差,例如未来要实现血缘功能,只会导致工作流定义越来越臃肿 - 任务、关系、工作流边界模糊,条件节点、延迟节点也被当做一种任务,实际是关系与条件的组合 基于以上痛点,我们需要重新定义任务、关系、工作流的业务边界,基于此重新设计它们的数据结构 ## 2.设计思路 ### 2.1 工作流、关系、作业 首先,我们抛开当前的实现,明确任务(后续描述更改为作业)、关系、工作流的业务边界,如何去解耦 - 作业:调度系统要执行的任务,作业内只包含执行作业所需要的数据和资源 - 关系:作业与作业之间的关系以及执行条件,包含执行关系(A完成后,执行B)和执行条件(A完成并成功后,执行B;A完成并失败后,执行C;A完成30分钟后,执行D) - 工作流:一组关系的载体,工作流只保存作业间的关系(DAG是工作流的一种展示形式,创建关系的一种方式) 结合当前DS支持的功能,我们可以做一个分类 - 作业:依赖检查、子流程、Shell、存储过程、Sql、Spark、Flink、MR、Python、Http、DataX、Sqoop - 关系:串行执行、并行执行、聚合执行、条件分支、延迟执行 - 工作流:调度执行的边界,包含一组关系 #### 2.1.1 进一步细化 作业定义数据,和当前的作业定义数据差别不大,都是由公共字段和自定义字段组成,只需要去掉关系相关的字段就可以了。 工作流定义数据,和当前的工作流定义数据差别也不大,去掉json字段就可以了。 关系数据,我们根据分类可以抽象为两个节点和一个路径。节点就是作业,路径包含前置节点到后置节点需要满足的条件规则是什么,条件规则包含:无条件、判断条件、延迟条件。 ### 2.2 版本管理 我们明确业务边界,解耦后它们之间就变成了引用关系,工作流和关系之间是一对多,关系和作业之间是一对多。不仅是定义数据,我们还要考虑实例数据,每次工作流的调度执行都会产生工作流实例,作业和工作流都是可以变更的,而工作流实例又要支持查看、重跑、恢复失败等。这就需要引入定义数据的版本管理了。每一次工作流、关系、作业变更都需要保存旧版本数据,生成新版本数据。 所以这里的设计思路是: 定义数据需要增加版本字段 定义表需要增加对应的日志表 创建定义数据时,双写到定义表和日志表,修改定义数据时,保存修改后的版本到日志表 定义表的引用数据中不需要保存版本信息(引用最新版本),实例数据中保存执行时的版本信息 ### 2.3 编码设计 这里还涉及到工作流、作业定义数据导入导出问题,根据之前社区讨论的方案,需要引入编码方案,工作流、关系、作业每条数据都会有一个唯一编码,相关Issue: https://github.com/apache/incubator-dolphinscheduler/issues/3820 资源:RESOURCE_xxx 作业:TASK_xxx 关系:RELATION_xxx 工作流:PROCESS_xxx 项目:PROJECT_xxx ## 3.设计方案 ### 3.1 表模型设计 #### 3.1.1 作业定义表:t_ds_task_definithon | 列名 | 描述 | | ----------------------- | -------------- | | id | 自增ID | | union_code | 唯一编码 | | version | 版本 | | name | 作业名称 | | description | 描述 | | task_type | 作业类型 | | task_params | 作业自定义参数 | | run_flag | 运行标志 | | task_priority | 作业优先级 | | worker_group | worker分组 | | fail_retry_times | 失败重试次数 | | fail_retry_interval | 失败重试间隔 | | timeout_flag | 超时标志 | | timeout_notify_strategy | 超时通知策略 | | timeout_duration | 超时时长 | | create_time | 创建时间 | | update_time | 修改时间 | #### 3.1.2 作业关系表:t_ds_task_relation | 列名 | 描述 | | ----------------------- | -------------------------------------- | | id | 自增ID | | union_code | 唯一编码 | | version | 版本 | | process_definition_code | 工作流编码 | | node_code | 节点编码(工作流编码/作业编码) | | post_node_code | 后置节点编码(工作流编码/作业编码) | | condition_type | 条件类型 0:无 1:判断条件 2:延迟条件 | | condition_params | 条件参数 | | create_time | 创建时间 | | update_time | 修改时间 | #### 3.1.3 工作流定义表:t_ds_process_definithon | 列名 | 描述 | | ---- | ---- | | id | 自增ID | | union_code | 唯一编码 | | version | 版本 | | name | 工作流名称 | | project_code | 项目编码 | | release_state | 发布状态 | | user_id | 所属用户ID | | description | 描述 | | global_params | 全局参数 | | flag | 流程是否可用:0 不可用,1 可用 | | receivers | 收件人 | | receivers_cc | 抄送人 | | timeout | 超时时间 | | tenant_id | 租户ID | | create_time | 创建时间 | | update_time | 修改时间 | #### 3.1.4 作业定义日志表:t_ds_task_definithon_log 作业定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 #### 3.1.5 作业关系日志表:t_ds_task_relation_log 作业关系表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 #### 3.1.6 工作流定义日志表:t_ds_process_definithon_log 工作流定义表基础上增加操作类型(新增、修改、删除)、操作人、操作时间 ### 3.2 前端 *这里的设计只是个人想法,交互上还需要前端帮助设计下* 需要增加作业管理相关功能,包括:作业列表,作业的创建、更新、删除、查看详情操作 创建工作流页面,需要将json拆分为工作流定义数据、作业关系数据传给后端API层保存/更新 工作流页面,拖拽任务节点时,增加引用作业选项 条件分支节点、延迟节点需要解析为关系中的条件规则数据;反之,查询工作流时需要将后端返回的条件规则数据展示为对应的节点 ### 3.3 Master Master调度工作流时,需要将<从json构建dag>修改为<从关系数据构建dag>,执行一个工作流时先全量加载关系数据(这里不加载作业数据),生成DAG,遍历DAG执行时,再获取需要执行的作业数据 其他执行流程和现有流程一致 -------------------- DolphinScheduler(Incubator) Commtter Hemin Wen 温合民 [email protected] --------------------
