klion26 commented on a change in pull request #8622: 
[FLINK-12438][doc-zh]Translate Task Lifecycle document into Chinese
URL: https://github.com/apache/flink/pull/8622#discussion_r297073638
 
 

 ##########
 File path: docs/internals/task_lifecycle.zh.md
 ##########
 @@ -23,172 +23,97 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-A task in Flink is the basic unit of execution. It is the place where each 
parallel instance of an operator is executed
-As an example, an operator with a parallelism of *5* will have each of its 
instances executed by a separate task. 
+Task 是 Flink 的基本执行单元。算子的每个并行实例都是在 task 里执行的。举个例子,一个并行度为 5 的算子,它的每个实例都由一个单独的 
task 来执行。
 
-The `StreamTask` is the base for all different task sub-types in Flink's 
streaming engine. This document goes through 
-the different phases in the lifecycle of the `StreamTask` and describes the 
main methods representing each of these 
-phases.
+在 Flink 流式计算引擎里,`StreamTask` 是所有不同子类型 task 的基础。这篇文档会深入 `StreamTask` 
生命周期的不同阶段,介绍每个阶段的主要方法。
 
 * This will be replaced by the TOC
 {:toc}
 
-## Operator Lifecycle in a nutshell
+## 算子生命周期简介
 
-Because the task is the entity that executes a parallel instance of an 
operator, its lifecycle is tightly integrated 
-with that of an operator. So, we will briefly mention the basic methods 
representing the lifecycle of an operator before 
-diving into those of the `StreamTask` itself. The list is presented below in 
the order that each of the methods is called. 
-Given that an operator can have a user-defined function (*UDF*), below each of 
the operator methods we also present 
-(indented) the methods in the lifecycle of the UDF that it calls. These 
methods are available if your operator extends 
-the `AbstractUdfStreamOperator`, which is the basic class for all operators 
that execute UDFs.
+因为 task 是算子并行实例的执行实体,所以它的生命周期跟算子的生命周期紧紧联系到一起。因此,在深入介绍 `StreamTask` 
生命周期之前,先简要介绍一下代表算子生命周期的各个基本方法。这些方法列表按调用的先后顺序排列如下所示。考虑到算子可能有用户自定义函数(*UDF*),我们也提到了
 UDF 生命周期里调用的各个方法。如果你的算子继承了 `AbstractUdfStreamOperator` 
的话,这些方法都是可用的,`AbstractUdfStreamOperator` 是所有继承 UDF 算子的基类。
 
-        // initialization phase
+        // 初始化阶段
         OPERATOR::setup
-            UDF::setRuntimeContext
+             UDF::setRuntimeContext
         OPERATOR::initializeState
         OPERATOR::open
-            UDF::open
+             UDF::open
         
-        // processing phase (called on every element/watermark)
+        // 调用处理阶段(通过每条数据或 watermark 来调用)
         OPERATOR::processElement
-            UDF::run
+             UDF::run
         OPERATOR::processWatermark
         
-        // checkpointing phase (called asynchronously on every checkpoint)
+        // checkpointing 阶段(通过每个 checkpoint 异步调用)
         OPERATOR::snapshotState
                 
-        // termination phase
+        // 结束阶段
         OPERATOR::close
-            UDF::close
+             UDF::close
         OPERATOR::dispose
-    
-In a nutshell, the `setup()` is called to initialize some operator-specific 
machinery, such as its `RuntimeContext` and 
-its metric collection data-structures. After this, the `initializeState()` 
gives an operator its initial state, and the 
- `open()` method executes any operator-specific initialization, such as 
opening the user-defined function in the case of 
-the `AbstractUdfStreamOperator`. 
-
-<span class="label label-danger">Attention</span> The `initializeState()` 
contains both the logic for initializing the 
-state of the operator during its initial execution (*e.g.* register any keyed 
state), and also the logic to retrieve its
-state from a checkpoint after a failure. More about this on the rest of this 
page.
-
-Now that everything is set, the operator is ready to process incoming data. 
Incoming elements can be one of the following: 
-input elements, watermark, and checkpoint barriers. Each one of them has a 
special element for handling it. Elements are 
-processed by the `processElement()` method, watermarks by the 
`processWatermark()`, and checkpoint barriers trigger a 
-checkpoint which invokes (asynchronously) the `snapshotState()` method, which 
we describe below. For each incoming element,
-depending on its type one of the aforementioned methods is called. Note that 
the `processElement()` is also the place 
-where the UDF's logic is invoked, *e.g.* the `map()` method of your 
`MapFunction`.
-
-Finally, in the case of a normal, fault-free termination of the operator 
(*e.g.* if the stream is finite and its end is 
-reached), the `close()` method is called to perform any final bookkeeping 
action required by the operator's logic (*e.g.* 
-close any connections or I/O streams opened during the operator's execution), 
and the `dispose()` is called after that 
-to free any resources held by the operator (*e.g.* native memory held by the 
operator's data). 
-
-In the case of a termination due to a failure or due to manual cancellation, 
the execution jumps directly to the `dispose()` 
-and skips any intermediate phases between the phase the operator was in when 
the failure happened and the `dispose()`.
-
-**Checkpoints:** The `snapshotState()` method of the operator is called 
asynchronously to the rest of the methods described 
-above whenever a checkpoint barrier is received. Checkpoints are performed 
during the processing phase, *i.e.* after the 
-operator is opened and before it is closed. The responsibility of this method 
is to store the current state of the operator 
-to the specified [state backend]({{ site.baseurl 
}}/ops/state/state_backends.html) from where it will be retrieved when 
-the job resumes execution after a failure. Below we include a brief 
description of Flink's checkpointing mechanism, 
-and for a more detailed discussion on the principles around checkpointing in 
Flink please read the corresponding documentation: 
-[Data Streaming Fault Tolerance]({{ site.baseurl 
}}/internals/stream_checkpointing.html).
-
-## Task Lifecycle
-
-Following that brief introduction on the operator's main phases, this section 
describes in more detail how a task calls 
-the respective methods during its execution on a cluster. The sequence of the 
phases described here is mainly included 
-in the `invoke()` method of the `StreamTask` class. The remainder of this 
document is split into two subsections, one 
-describing the phases during a regular, fault-free execution of a task (see 
[Normal Execution](#normal-execution)), and 
-(a shorter) one describing the different sequence followed in case the task is 
cancelled (see [Interrupted Execution](#interrupted-execution)), 
-either manually, or due some other reason, *e.g.* an exception thrown during 
execution.
-
-### Normal Execution
-
-The steps a task goes through when executed until completion without being 
interrupted are illustrated below:
-
-           TASK::setInitialState
-           TASK::invoke
-           create basic utils (config, etc) and load the chain of operators
-           setup-operators
-           task-specific-init
-           initialize-operator-states
-                   open-operators
-           run
-           close-operators
-           dispose-operators
-           task-specific-cleanup
-           common-cleanup
-
-As shown above, after recovering the task configuration and initializing some 
important runtime parameters, the very 
-first step for the task is to retrieve its initial, task-wide state. This is 
done in the `setInitialState()`, and it is 
-particularly important in two cases:
-
-1. when the task is recovering from a failure and restarts from the last 
successful checkpoint
-2. when resuming from a [savepoint]({{ site.baseurl 
}}/ops/state/savepoints.html). 
-
-If it is the first time the task is executed, the initial task state is empty. 
-
-After recovering any initial state, the task goes into its `invoke()` method. 
There, it first initializes the operators 
-involved in the local computation by calling the `setup()` method of each one 
of them and then performs its task-specific 
-initialization by calling the local `init()` method. By task-specific, we mean 
that depending on the type of the task 
-(`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step 
may differ, but in any case, here is where 
-the necessary task-wide resources are acquired. As an example, the 
`OneInputStreamTask`, which represents a task that 
-expects to have a single input stream, initializes the connection(s) to the 
location(s) of the different partitions of 
-the input stream that are relevant to the local task.
-
-Having acquired the necessary resources, it is time for the different 
operators and user-defined functions to acquire 
-their individual state from the task-wide state retrieved above. This is done 
in the `initializeState()` method, which 
-calls the `initializeState()` of each individual operator. This method should 
be overridden by every stateful operator 
-and should contain the state initialization logic, both for the first time a 
job is executed, and also for the case when 
-the task recovers from a failure or when using a savepoint.
-
-Now that all operators in the task have been initialized, the `open()` method 
of each individual operator is called by 
-the `openAllOperators()` method of the `StreamTask`. This method performs all 
the operational initialization, 
-such as registering any retrieved timers with the timer service. A single task 
may be executing multiple operators with one 
-consuming the output of its predecessor. In this case, the `open()` method is 
called from the last operator, *i.e.* the 
-one whose output is also the output of the task itself, to the first. This is 
done so that when the first operator starts 
-processing the task's input, all downstream operators are ready to receive its 
output.
-
-<span class="label label-danger">Attention</span> Consecutive operators in a 
task are opened from the last to the first.
-
-Now the task can resume execution and operators can start processing fresh 
input data. This is the place where the 
-task-specific `run()`  method is called. This method will run until either 
there is no more input data (finite stream), 
-or the task is cancelled (manually or not). Here is where the operator 
specific `processElement()` and `processWatermark()` 
-methods are called.
-
-In the case of running till completion, *i.e.* there is no more input data to 
process, after exiting from the `run()` 
-method, the task enters its shutdown process. Initially, the timer service 
stops registering any new timers (*e.g.* from 
-fired timers that are being executed), clears all not-yet-started timers, and 
awaits the completion of currently 
-executing timers. Then the `closeAllOperators()` tries to gracefully close the 
operators involved in the computation by 
-calling the `close()` method of each operator. Then, any buffered output data 
is flushed so that they can be processed 
-by the downstream tasks, and finally the task tries to clear all the resources 
held by the operators by calling the 
-`dispose()` method of each one. When opening the different operators, we 
mentioned that the order is from the 
-last to the first. Closing happens in the opposite manner, from first to last.
-
-<span class="label label-danger">Attention</span> Consecutive operators in a 
task are closed from the first to the last.
-
-Finally, when all operators have been closed and all their resources freed, 
the task shuts down its timer service, 
-performs its task-specific cleanup, *e.g.* cleans all its internal buffers, 
and then performs its generic task clean up 
-which consists of closing all its output channels and cleaning any output 
buffers.
-
-**Checkpoints:** Previously we saw that during `initializeState()`, and in 
case of recovering from a failure, the task 
-and all its operators and functions retrieve the state that was persisted to 
stable storage during the last successful 
-checkpoint before the failure. Checkpoints in Flink are performed periodically 
based on a user-specified interval, and 
-are performed by a different thread than that of the main task thread. That's 
why they are not included in the main 
-phases of the task lifecycle. In a nutshell, special elements called 
`CheckpointBarriers` are injected periodically by 
-the source tasks of a job in the stream of input data, and travel with the 
actual data from source to sink. A source 
-task injects these barriers after it is in running mode, and assuming that the 
`CheckpointCoordinator` is also running. 
-Whenever a task receives such a barrier, it schedules a task to be performed 
by the checkpoint thread, which calls the 
-`snapshotState()` of the operators in the task. Input data can still be 
received by the task while the checkpoint is 
-being performed, but the data is buffered and only processed and emitted 
downstream after the checkpoint is successfully 
-completed.
-
-### Interrupted Execution
-
-In the previous sections we described the lifecycle of a task that runs till 
completion. In case the task is cancelled 
-at any point, then the normal execution is interrupted and the only operations 
performed from that point on are the timer 
-service shutdown, the task-specific cleanup, the disposal of the operators, 
and the general task cleanup, as described 
-above.
-
-{% top %}
+
+简而言之,调用 `setup()` 是初始化算子级别的组件,比如 `RuntimeContext` 和 
指标收集的数据结构。在这之后,`initializeState()` 给算子提供初始状态,
+ `open()` 方法执行所有算子级别的初始化,比如在继承 `AbstractUdfStreamOperator` 的情况下,打开用户定义的函数。
+
+<span class="label label-danger">注意</span> `initializeState()` 既包含在初始执行时(比如注册 
keyed 状态)初始化算子状态的逻辑,又包含作业失败后从 checkpoint 中恢复原有状态的逻辑。在接下来的篇幅会更详细的介绍这块。
+
+当所有初始化都设置之后,算子开始准备处理即将流入的数据。流入的数据可以分为三种类型:正常输入元素、水位 和 checkpoint 
屏障。每种类型的数据都有单独的方法来处理。正常输入元素通过 `processElement()` 方法来处理,水位通过 
`processWatermark()` 来处理,checkpoint 屏障会触发异步执行的 `snapshotState()` 方法来进行 
checkpoint。`processElement()`方法也是用户自定义函数逻辑执行的地方,比如用户自定义 `MapFunction` 里的 
`map()` 方法。
+
+最后,在正常无失败的情况下结束算子(比如,如果流式数据是有限的,并且最后一个数据已经到了)会调用 `close()` 
方法,进行算子逻辑(比如关闭算子执行期间打开的连接或 I/O 流)要求的最终簿记工作。在这之后会调用 `dispose()` 
方法来释放算子持有的资源(比如算子数据持有的本地内存)。
+
+在作业失败或手动取消的情况下结束算子,整个执行过程会直接跳到 `dispose()` 方法,跳过算子在故障发生时所处阶段和 `dispose()` 
之间的中间数据阶段。
+
+**Checkpoints:** 算子的 `snapshotState()` 方法是在收到 checkpoint 屏障后异步调用的。Checkpoint 
是在处理阶段执行的,即算子打开之后,结束之前的这个阶段。这个方法的职责就是存储算子的当前状态到一个特定的[状态后端]({{ site.baseurl 
}}/ops/state/state_backends.html),当作业失败后恢复执行时会从这个后端读取状态数据。下面我们简要描述了 Flink 的 
checkpoint 机制,如果想了解更多 Flink checkpoint 相关的原理,可以读一读 [数据流容错]({{ site.baseurl 
}}/internals/stream_checkpointing.html)。
+
+## Task 生命周期
+
+在上文对算子主要阶段的简要介绍之后,本节将详细介绍 task 在集群执行期间是如何调用相关方法的。这里所说的阶段序列主要包含在 `StreamTask` 
类的 `invoke()` 方法里。本文档的剩余部分会分成两个子章节,其中一节描述了 task 在常规没有失败执行的阶段(请参考 
[常规执行](#normal-execution)),另外一节描述了 task 取消之后的不同阶段序列(请参考 
[中断执行](#interrupted-execution)),不管是手动取消还是其他原因导致的取消,比如执行期间的异常。
+
+### 常规执行
+
+Task 在没有中断的情况下执行直到最终完成所经历的步骤如下所示:
+
+        TASK::setInitialState
+        TASK::invoke
+            create basic utils (config, etc) and load the chain of operators
+            setup-operators
+            task-specific-init
+            initialize-operator-states
+            open-operators
+            run
+            close-operators
+            dispose-operators
+            task-specific-cleanup
+            common-cleanup
+
+如上所示,在恢复 task 配置和初始化一些重要的运行时参数之后,task 的第一步就是读取它初始的、task 级别的状态数据。这一步是在 
`setInitialState()` 方法里完成的,在下面两个场景特别重要:
+1. 当 Task 从失败中恢复,从最后一次成功的 checkpoint 重启的时候
+2. 当 Task 从 [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) 里恢复的时候。
+
+如果 task 是第一次执行的话,它的初始状态为空。
+
+在恢复初始状态之后,task 进入到 `invoke()` 方法。在这里,首先调用 `setup()` 方法来初始化本地计算涉及到的每个算子,然后调用本地的 
`init()` 方法来做 特定 task 的初始化。这里所说的特定 task,取决于 task 的类型 
(`SourceTask`,`OneInputStreamTask` 或 `TwoInputStreamTask` 等等)。这一步可能会有差异,但却是申请 
task 范围所需资源的地方。举个例子,`OneInputStreamTask` 代表着只接受一个输入流的 task,它初始化输入流不同分区的连接位置。
+
+在申请到必要的资源之后,不同算子和用户定义函数开始从上面读到的 task 范围状态数据里获取他们各自的状态值。这块工作是在各个算子里调用 
`initializeState()` 方法完成的。每个有状态的算子都应该重写该方法,包含状态初始化的逻辑,既适用于作业第一次执行的场景,又适用于 task 
从失败或 savepoint 中恢复的场景。
+
+现在 task 里的所有算子都已经被初始化了,每个算子里的 `open()` 方法也通过 `StreamTask` 的 
`openAllOperators()` 方法调用了。这个方法进行所有操作的初始化,比如在定时器服务里注册获取到的定时器。一个 task 
可能会执行多个算子,即一个算子消费它之前算子的输出数据流。在这种情况下,`open()` 方法通过最后一个算子来调用,即算子的输出刚好也是整个 task 
的输出。当第一个算子开始处理 task 的输入数据流时,所有下游算子已经准备接收它的输出数据了。
+
+<span class="label label-danger">注意</span> task 里多个连续算子是从后往前依次开启的。
+
+现在 task 可以恢复执行,算子可以开始处理新输入的数据。在这里,task 级别的 `run()` 
会被调用。这个方法会一直运行直到没有更多输入数据进来(有限的数据流)或者 task 被取消了(人为的或其他的原因)。这里是算子级别的 
`processElement()` 方法和 `processWatermark()` 方法执行的地方。
+
+在运行到完成的情况下,即没有输入数据需要处理,在退出 `run()`  方法之后,task 
进入到关闭阶段。首先定时器服务停止注册任何新的定时器(比如从正在执行的定时器里注册),清理掉所有还未启动的定时器,等待当前执行中的定时器运行结束。`closeAllOperators()`
 方法通过调用每个算子的 `close()` 方法来优雅的关掉所有参与计算的算子。然后所有缓存的输出数据会刷出去以便下游 task 处理,最终 task 
通过调用每个算子的 `dispose()` 
方法来尝试清理掉算子持有的所有资源。之前我们提到不同算子开启时,是从后往前依次调用;关闭时刚好相反,从前往后依次调用。
+
+<span class="label label-danger">注意</span> task 里的多个连续算子关闭时是从前往后依次进行。
+
+最后,当所有算子都已经关闭,所有资源都已被释放时,task 关掉它的定时器服务,进行 task 级别的清理操作,即清理掉所有内部缓存,然后进行常规的 
task 清理操作,其中包括关闭所有的输出管道,清理所有输出缓存等。 
+
+**Checkpoints:** 之前我们看到在执行 `initializeState()` 方法期间,在从异常失败中恢复的情况下,task 
和它内部的所有算子函数都从最后一次成功的 checkpoint 数据里获取对应的状态信息。Flink 里的 checkpoint 
是根据用户自定义的时间间隔定时执行的,在一个单独的线程里进行的,与执行算子操作的 task 的主线程不同。这也是我们没有把 checkpoint 过程涵盖在 
task 生命周期主要阶段里的原因。简而言之,Flink 作业的输入数据流 task 会定时插入一种叫 `checkpoint 屏障` 
的特殊数据,并跟正常数据一起从数据源头流入到最终落盘。数据源头 task 在进入运行模式后会插入这些屏障数据,假设 `checkpoint协调者` 
也在运行中。当 task 接收到这样的屏障之后,会通过 task 算子里的 `snapshotState()` 方法调度 checkpoint 
线程执行具体任务。在 checkpoint 处理期间,task 依然可以接收输入数据,但是数据会被缓存起来,当 checkpoint 
执行成功之后才会被处理和发送到下游算子。 
 
 Review comment:
   ```suggestion
   **Checkpoints:** 之前我们看到在执行 `initializeState()` 方法期间,在从异常失败中恢复的情况下,task 
和它内部的所有算子函数都从最后一次成功的 checkpoint 数据里获取对应的状态信息。Flink 里的 checkpoint 
是根据用户自定义的时间间隔定时执行的,在一个单独的线程里进行,与执行算子操作的 task 的主线程不同。这也是我们没有把 checkpoint 过程涵盖在 
task 生命周期主要阶段里的原因。简而言之,Flink 作业的输入数据流 task 会定时插入一种叫 `checkpoint barrier` 
的特殊数据,并跟正常数据一起从数据源头流入到汇算子。在 `CheckpointCoordinator` 运行过程中,数据源头 task 会插入这些 
barrier 数据。当 task 接收到这样的 barrier 之后,会通过 task 算子里的 `snapshotState()` 方法调度 
checkpoint 线程执行具体任务。在 checkpoint 处理期间,task 依然可以接收输入数据,但是数据会被缓存起来,当 checkpoint 
执行成功之后才会被处理和发送到下游算子。 
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to