I think Idea B is more suitable for the "fake run task".

And what  will happen if  I set  workflow control  nodes to 
"fake run"? 
Workflow control nodes include dependency node, condition node 
and sub-process node.


------------------ 原始邮件 ------------------
发件人:&nbsp;"wu shaoj"<[email protected]&gt;;
发送时间:&nbsp;2020年7月3日(星期五) 上午10:10
收件人:&nbsp;"[email protected]"<[email protected]&gt;;

主题:&nbsp;Re: Discuss on fake-run feature 任务空跑的方案讨论



I think 'handled on WorkerServer' is better.

On 2020/7/3, 07:35, "Pu Hsu" <[email protected]&gt; wrote:

&nbsp;&nbsp;&nbsp; Purpose: When we run a process, a task will be regarded as 
success directly if marked as "fake run".

&nbsp;&nbsp;&nbsp; Sub-goals:
&nbsp;&nbsp;&nbsp; 1. Marking a task as "fake run"
&nbsp;&nbsp;&nbsp; 2. Handling the fake-run task
&nbsp;&nbsp;&nbsp; 3. Providing the HTTP API

&nbsp;&nbsp;&nbsp; ## Subgoal 1 : Design

&nbsp;&nbsp;&nbsp; In the existing design, 
`org.apache.dolphinscheduler.common.model.TaskNode` stores information about a 
task in the workflow. The member `String runFlag` in TaskNode is used for 
running state, and runFlag has two types of `NORMAL` and `FORBIDDEN`. Tracking 
the references of the runFlag shows that currently only 
`TaskNode.isForbidden()` used it.

&nbsp;&nbsp;&nbsp; We're going to add a new `FAKE` type to runFlag.

&nbsp;&nbsp;&nbsp; ## Subgoal 2 : Design

&nbsp;&nbsp;&nbsp; The existing running flow of a process is:

&nbsp;&nbsp;&nbsp; 1. MasterServer builds TaskInstance from TaskNode
&nbsp;&nbsp;&nbsp; 2. MasterServer submits TaskInstance to database by 
MasterBaseTaskExecThread
&nbsp;&nbsp;&nbsp; 3. MasterServer dispatch TaskInstance
&nbsp;&nbsp;&nbsp; 3.1 Construct TaskExecutionContext from TaskInstance
&nbsp;&nbsp;&nbsp; 3.2 Construct Command from TaskExecutionContext
&nbsp;&nbsp;&nbsp; 3.3 Send Command to WorkerServer by NettyExecutorManager
&nbsp;&nbsp;&nbsp; 3.4 Wait for the result
&nbsp;&nbsp;&nbsp; 4. WorkerServer receives Command through 
NettyRemotingServer, and then processes Command of type `TASK_EXECUTE_REQUEST` 
and gets TaskExecutionContext from Command by TaskExecuteProcessor
&nbsp;&nbsp;&nbsp; 5. WorkerServer handles TaskExecutionContext by 
TaskExecuteThread
&nbsp;&nbsp;&nbsp; 6. WorkerServer returns the result to MasterServer by 
TaskCallbackService

&nbsp;&nbsp;&nbsp; Firstly, we must pass the `FAKE` mark to TaskInstance when 
creating TaskInstance from TaskNode.
&nbsp;&nbsp;&nbsp; After that, there are multiple candidate places to intercept 
and process the marked task in TaskInstance/TaskExecutionContext:

&nbsp;&nbsp;&nbsp; ### Idea A : Skip the task (handled on MasterServer)

&nbsp;&nbsp;&nbsp; Result: MasterServer creates the TaskInstance in memory 
only, then the task is passed and the process goes on. No submit to db, no 
Command, no dispatch, no wait, no WorkerServer involved.

&nbsp;&nbsp;&nbsp; Between step 1 and step 2 in the above flow, the 
MasterServer creates a subclass of MasterBaseTaskExecThread for each 
TaskInstance by `MasterExecThread.submitTaskExec()` to process TaskInstance 
asynchronously.

&nbsp;&nbsp;&nbsp; So we can get the `FAKE` mark and create a 
SkippedTaskExecThread here to skip the task and update the task status directly.

&nbsp;&nbsp;&nbsp; ### Idea B : Just do a fake-run (handled on WorkerServer)

&nbsp;&nbsp;&nbsp; Result: All regular things on the MasterServer run normally, 
the WorkerServer detects the FAKE mark and do a fake-run.

&nbsp;&nbsp;&nbsp; After the WorkerServer gets the FAKE mark from 
TaskExecutionContext, it doesn't invoke any real handler and returns success 
directly.

&nbsp;&nbsp;&nbsp; ## Subgoal 3 : Design

&nbsp;&nbsp;&nbsp; Among the existing HTTP API, the `/save` and `/update` in 
`ProcessDefinitionController` accept a `String processDefinitionJson` field. 
ApiApplicationServer generates a `ProcessData` from it by 
`JSONUtils.parseObject()`, which has a `List<TaskNode&gt; tasks` member.

&nbsp;&nbsp;&nbsp; That is: the runFlag is automatically parsed by the JSON 
library (fastjson), so no additional processing is required. The front-end 
should send a `FAKE` string here.

&nbsp;&nbsp;&nbsp; P.S. One possible improvement is to increase the validity 
check of this field.

&nbsp;&nbsp;&nbsp; ---

&nbsp;&nbsp;&nbsp; 目的:标记特定任务为“空跑(fake-run)”,使其不执行具体任务而直接视为运行成功,继续任务流。

&nbsp;&nbsp;&nbsp; 子目标:
&nbsp;&nbsp;&nbsp; 1. 标记空跑任务
&nbsp;&nbsp;&nbsp; 2. 处理空跑任务
&nbsp;&nbsp;&nbsp; 3. 提供后端接口

&nbsp;&nbsp;&nbsp; ## 子目标 1 的设计

&nbsp;&nbsp;&nbsp; 现有的设计中,`org.apache.dolphinscheduler.common.model.TaskNode` 
存储了工作流中的一项任务的信息。其中,`String runFlag` 意为运行状态,目前有 `NORMAL` 和 `FORBIDDEN` 两种。追踪 
runFlag 成员的引用可知,目前实际只有 `TaskNode.isForbidden()` 里用到了。

&nbsp;&nbsp;&nbsp; 本方案添加名为 `FAKE` 的新类型。

&nbsp;&nbsp;&nbsp; ## 子目标 2 的设计

&nbsp;&nbsp;&nbsp; 现有的任务处理流程是:

&nbsp;&nbsp;&nbsp; 1. MasterServer 从 TaskNode 构建 TaskInstance
&nbsp;&nbsp;&nbsp; 2. MasterServer 通过 MasterBaseTaskExecThread 提交 TaskInstance 
到数据库
&nbsp;&nbsp;&nbsp; 3. MasterServer 分派 TaskInstance
&nbsp;&nbsp;&nbsp; 3.1 从 TaskInstance 构建 TaskExecutionContext
&nbsp;&nbsp;&nbsp; 3.2 从 TaskExecutionContext 构建 Command
&nbsp;&nbsp;&nbsp; 3.3 MasterServer 通过 NettyExecutorManager 向 WorkerServer 发送 
Command
&nbsp;&nbsp;&nbsp; 3.4 MasterServer 等待并接收结果
&nbsp;&nbsp;&nbsp; 4. WorkerServer 通过 NettyRemotingServer 接收 Command,其中 
TaskExecuteProcessor 处理 `TASK_EXECUTE_REQUEST` 类型的 Command,并从 Command 中拿到 
TaskExecutionContext
&nbsp;&nbsp;&nbsp; 5. WorkerServer 由 TaskExecuteThread 处理 TaskExecutionContext
&nbsp;&nbsp;&nbsp; 6. WorkerServer 通过 TaskCallbackService 向 MasterServer 返回结果

&nbsp;&nbsp;&nbsp; 首先,在从 TaskNode 创建 TaskInstance 时将 FAKE 标记到 TaskInstance 上。
&nbsp;&nbsp;&nbsp; 之后,有多个地方可以拦截并处理 TaskInstance/TaskExecutionContext 里的标记:

&nbsp;&nbsp;&nbsp; ### 想法 A:在 MasterServer 端即跳过(skip)该任务

&nbsp;&nbsp;&nbsp; 结果:只有在 MasterServer 端做处理,创建了 TaskInstance 后即可结束流程处理下一个 
Task。TaskInstance 不写数据库,也没有 Command、不用等 Master-Worker 的交互,WorkerServer 不需任何处理。

&nbsp;&nbsp;&nbsp; 在上述任务处理流程中的 1、2 之间,MasterServer 通过 
`MasterExecThread.submitTaskExec()` 为每个 TaskInstance 创建了一个 
MasterBaseTaskExecThread 子类来异步处理 TaskInstance。

&nbsp;&nbsp;&nbsp; 因此,可以就在这里创建一个 SkippedTaskExecThread 直接更新任务状态。

&nbsp;&nbsp;&nbsp; ### 想法 B:在 WorkerServer 端空跑(fake run)该任务

&nbsp;&nbsp;&nbsp; 结果:所有 MasterServer 端的流程都正常进行,只是 WorkerServer 识别 FAKE 
标记并做一次空跑。

&nbsp;&nbsp;&nbsp; WorkerServer 从 TaskExecutionContext 里拿到 SKIPPED 
标记,不调用具体的处理器,直接返回成功。

&nbsp;&nbsp;&nbsp; ## 子目标 3 的设计

&nbsp;&nbsp;&nbsp; 现有的后端接口中,`ProcessDefinitionController` 的 `/save` `/update` 
接口都接受一个 `String processDefinitionJson` 字段。该字段通过 `JSONUtils.parseObject()` 产生 
`ProcessData`,它有一个 `List<TaskNode&gt; tasks` 成员。

&nbsp;&nbsp;&nbsp; 即:runFlag 字段是通过 JSON 库(fastjson)自动解析的,因此无需额外处理。只要前端传递 `FAKE` 
字符串即可。

&nbsp;&nbsp;&nbsp; 额外地,一个可能的改进是增加该字段的有效性检查。

Reply via email to