Repository: flink Updated Branches: refs/heads/master cb9e409b7 -> a2e6fb06c
[FLINK-5937] [doc] Add documentation about the stream task lifecycle Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2e6fb06 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2e6fb06 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2e6fb06 Branch: refs/heads/master Commit: a2e6fb06c404a31294c53903fbaeaac666403dc8 Parents: cb9e409 Author: kl0u <[email protected]> Authored: Tue Feb 28 15:41:37 2017 +0100 Committer: kl0u <[email protected]> Committed: Fri Mar 3 12:53:17 2017 +0100 ---------------------------------------------------------------------- docs/internals/task_lifecycle.md | 192 ++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a2e6fb06/docs/internals/task_lifecycle.md ---------------------------------------------------------------------- diff --git a/docs/internals/task_lifecycle.md b/docs/internals/task_lifecycle.md new file mode 100644 index 0000000..cc557a1 --- /dev/null +++ b/docs/internals/task_lifecycle.md @@ -0,0 +1,192 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed +As an example, an operator with a parallelism of *5* will have each of its instances executed by a separate task. + +The `StreamTask` is the base for all different task sub-types in Flink's streaming engine. This document goes through +the different phases in the lifecycle of the `StreamTask` and describes the main methods representing each of these +phases. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes a parallel instance of an operator, its lifecycle is tightly integrated +with that of an operator. So, we will briefly mention the basic methods representing the lifecycle of an operator before +diving into those of the `StreamTask` itself. The list is presented below in the order that each of the methods is called. +Given that an operator can have a user-defined function (*UDF*), below each of the operator methods we also present +(indented) the methods in the lifecycle of the UDF that it calls. These methods are available if your operator extends +the `AbstractUdfStreamOperator`, which is the basic class for all operators that execute UDFs. + + // initialization phase + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing phase (called on every element/watermark) + OPERATOR::processElement + UDF::run + OPERATOR::processWatermark + + // checkpointing phase (called asynchronously on every checkpoint) + OPERATOR::snapshotState + + // termination phase + OPERATOR::close + UDF::close + OPERATOR::dispose + +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of +the `AbstractUdfStreamOperator`. + +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the +state of the operator during its initial execution (*e.g.* register any keyed state), and also the logic to retrieve its +state from a checkpoint after a failure. More about this on the rest of this page. + +Now that everything is set, the operator is ready to process incoming data. Incoming elements can be one of the following: +input elements, watermark, and checkpoint barriers. Each one of them has a special element for handling it. Elements are +processed by the `processElement()` method, watermarks by the `processWatermark()`, and checkpoint barriers trigger a +checkpoint which invokes (asynchronously) the `snapshotState()` method, which we describe below. For each incoming element, +depending on its type one of the aforementioned methods is called. Note that the `processElement()` is also the place +where the UDF's logic is invoked, *e.g.* the `map()` method of your `MapFunction`. + +Finally, in the case of a normal, fault-free termination of the operator (*e.g.* if the stream is finite and its end is +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic (*e.g.* +close any connections or I/O streams opened during the operator's execution), and the `dispose()` is called after that +to free any resources held by the operator (*e.g.* native memory held by the operator's data). + +In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()` +and skips any intermediate phases between the phase the operator was in when the failure happened and the `dispose()`. + +**Checkpoints:** The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described +above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, *i.e.* after the +operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator +to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it will be retrieved when +the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism, +and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: +[Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Following that brief introduction on the operator's main phases, this section describes in more detail how a task calls +the respective methods during its execution on a cluster. The sequence of the phases described here is mainly included +in the `invoke()` method of the `StreamTask` class. The remainder of this document is split into two subsections, one +describing the phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and +(a shorter) one describing the different sequence followed in case the task is cancelled (see [Interrupted Execution](#interrupted-execution)), +either manually, or due some other reason, *e.g.* an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + TASK::setInitialState + TASK::invoke + create basic utils (config, etc) and load the chain of operators + setup-operators + task-specific-init + initialize-operator-states + open-operators + run + close-operators + dispose-operators + task-specific-cleanup + common-cleanup + +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very +first step for the task is to retrieve its initial, task-wide state. This is done in the `setInitialState()`, and it is +particularly important in two cases: + +1. when the task is recovering from a failure and restarts from the last successful checkpoint +2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html). + +If it is the first time the task is executed, the initial task state is empty. + +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators +involved in the local computation by calling the `setup()` method of each one of them and then performs its task-specific +initialization by calling the local `init()` method. By task-specific, we mean that depending on the type of the task +(`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where +the necessary task-wide resources are acquired. As an example, the `OneInputStreamTask`, which represents a task that +expects to have a single input stream, initializes the connection(s) to the location(s) of the different partitions of +the input stream that are relevant to the local task. + +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire +their individual state from the task-wide state retrieved above. This is done in the `initializeState()` method, which +calls the `initializeState()` of each individual operator. This method should be overridden by every stateful operator +and should contain the state initialization logic, both for the first time a job is executed, and also for the case when +the task recovers from a failure or when using a savepoint. + +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by +the `openAllOperators()` method of the `StreamTask`. This method performs all the operational initialization, +such as registering any retrieved timers with the timer service. A single task may be executing multiple operators with one +consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, *i.e.* the +one whose output is also the output of the task itself, to the first. This is done so that when the first operator starts +processing the task's input, all downstream operators are ready to receive its output. + +<span class="label label-danger">Attention</span> Consecutive operators in a task are opened from the last to the first. + +Now the task can resume execution and operators can start processing fresh input data. This is the place where the +task-specific `run()` method is called. This method will run until either there is no more input data (finite stream), +or the task is cancelled (manually or not). Here is where the operator specific `processElement()` and `processWatermark()` +methods are called. + +In the case of running till completion, *i.e.* there is no more input data to process, after exiting from the `run()` +method, the task enters its shutdown process. Initially, the timer service stops registering any new timers (*e.g.* from +fired timers that are being executed), clears all not-yet-started timers, and awaits the completion of currently +executing timers. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by +calling the `close()` method of each operator. Then, any buffered output data is flushed so that they can be processed +by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the +`dispose()` method of each one. When opening the different operators, we mentioned that the order is from the +last to the first. Closing happens in the opposite manner, from first to last. + +<span class="label label-danger">Attention</span> Consecutive operators in a task are closed from the first to the last. + +Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, +performs its task-specific cleanup, *e.g.* cleans all its internal buffers, and then performs its generic task clean up +which consists of closing all its output channels and cleaning any output buffers. + +**Checkpoints:** Previously we saw that during `initializeState()`, and in case of recovering from a failure, the task +and all its operators and functions retrieve the state that was persisted to stable storage during the last successful +checkpoint before the failure. Checkpoints in Flink are performed periodically based on a user-specified interval, and +are performed by a different thread than that of the main task thread. That's why they are not included in the main +phases of the task lifecycle. In a nutshell, special elements called `CheckpointBarriers` are injected periodically by +the source tasks of a job in the stream of input data, and travel with the actual data from source to sink. A source +task injects these barriers after it is in running mode, and assuming that the `CheckpointCoordinator` is also running. +Whenever a task receives such a barrier, it schedules a task to be performed by the checkpoint thread, which calls the +`snapshotState()` of the operators in the task. Input data can still be received by the task while the checkpoint is +being performed, but the data is buffered and only processed and emitted downstream after the checkpoint is successfully +completed. + +### Interrupted Execution + +In the previous sections we described the lifecycle of a task that runs till completion. In case the task is cancelled +at any point, then the normal execution is interrupted and the only operations performed from that point on are the timer +service shutdown, the task-specific cleanup, the disposal of the operators, and the general task cleanup, as described +above.
