[
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422848#comment-15422848
]
ASF GitHub Bot commented on FLINK-3755:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2376
[FLINK-3755] Introduce key groups for key-value state to support dynamic
scaling
This pull request introduces the concept of key groups to Flink. A key
group is the smallest assignable unit of key-value state to a stream operator.
Differently said, it is a sub set of the key space which can be assigned to a
stream operator. With the introduction of key groups, it will be possible to
dynamically scale Flink operators that use partitioned (=key-value) state.
In particular, this pull request addresses the following sub-issues:
- fully: [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism
Parameter
- partially: [FLINK-4381] Refactor State to Prepare For Key-Group State
Backends
Furthermore, this pull request is partially based on pull request: #1988
Overall, this pull request introduces the following changes:
# 1) Adopted from #1988 (plus introduction of distributing keys as ranges
(`KeyGroupRange`)
## a) Introduction of KeyGroupAssigner
In order to partition keys into key groups, the`KeyGroupAssigner` interface
is introduced. This allows for partitioning the key space into smaller units
which can be assigned to operators. A scale up/down of parallelism is then
performed by simply reassigning the key groups to more/less operators.
For this pull request, the former `HashPartitioner` is now renamed to
`KeyGroupStreamPartitioner` and uses the `KeyGroupAssigner` to distribute the
streaming records in a consistent way w.r.t. the key group mappings. The key
groups, in turn, are mapped as ranges of key groups (`key group index *
parallelism / number of key groups` = out-going channel) to the downstream
tasks.
When restoring from a checkpoint or savepoint, scale up/down of parallelism
basically boils down to splitting/merging the key group ranges in alignment
with the adjusting assignment to operators that happens automatically through
the `KeyGroupStreamPartitioner`.
## b) Introduction of MaxParallelism to user API
In order to scale programs up or down, it is necessary to define the
maximum number of key groups. The maximum number of key groups denotes the
maximum parallelism of an operator, because every operator needs at least one
key group to get elements assigned. Thus, in order to specify this upper limit,
the ExecutionConfig allows to set a job-wide max parallelism value via
ExecutionConfig.setMaxParallelism. In addition to that the
SingleOutputStreamOperator allows to set similarly to the parallelism a max
parallelism value on an operator basis. If the max parallelism has not been set
and there is no job-wide max parallelism set, the parallelism of the operator
will be taken as the max parallelism. Thus, every operator would then receive a
single key group. Currently, we restrict the maximum number of key groups to
2^15 (Short.MAX_VALUE).
# 2) State and StateHandle refactoring
## a) StateHandle refactoring
We have simplified and cleaned up the hierarchy and use cases of state
handles. `StreamStateHandle` and `RetrievableStateHandle` are now at the heart
of the state handles system.
Their conceptual main difference is that `StreamStateHandle` provides a
seekable input stream to the actual state data and leaves state reconstruction
to client code, whereas `RetrievableStateHandle` represents a simple way for
client code to retrieve state as readily constructed object and the state
handle implementation taking care of state reconstruction.
## b) Operator serialization
The unified abstraction for operators to persist their state is
`CheckpointStateOutputStream`. All operators can simply directly write their
serialized state into this stream, which returns a `StreamStateHandle` on
close. `StreamTaskState` and `StreamTaskStateList` become obsolete. This change
makes versioning of operator state serialization formats easier and we should
ensure and test that our operators are aware of serialization versions.
This change leaves the following methods for snapshot/restore in
`StreamOperator`:
```
void snapshotState(
FSDataOutputStream out,
long checkpointId,
long timestamp) throws Exception;
void restoreState(FSDataInputStream in) throws Exception;
```
## c) Split task state into operator state (= non-partitioned state) and
keyed-state (= partitioned state)
We have split the operator state into operator state and keyed state as
follows.
Operator state is organized as a `ChainedStateHandle<StreamStateHandle>`.
The chained state handle encapsulates the individual `StreamStateHandle` for
all operators in an operator chain.
Keyed state is organized as a `List<KeyGroupsStateHandle>`. Each
`KeyGroupsStateHandle` consists of one `StreamStateHandle` and one
`KeyGroupRangeOffsets` object. `KeyGroupRangeOffsets` denotes the range of all
key groups that can are referenced by the handle, together with their
respective stream offsets. The `StreamStateHandle` gives access to a seekable
stream that contains the actual state data for all key groups from the key
group range; individual key group states are located in the stream at their
previously mentioned stream offsets.
Notice that we have to provide a list of `KeyGroupsStateHandle` to support
the case of scaling down parallelism. In this case, it can happen that key
group states from several `KeyGroupsStateHandle` (each representing the state
of one previously existing operator) have to be combined to form the keyed
state of reduced amount of current operators.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink keyed-state
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2376.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2376
----
commit 8a57da24b499e059fb73bd7050a96d32b57fcec4
Author: Till Rohrmann <[email protected]>
Date: 2016-07-28T13:08:24Z
[FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
This introduces a new KeySelector that assigns keys to key groups and
also adds the max parallelism parameter throughout all API levels.
This also adds tests for the newly introduced features.
commit 62fb798b762d8a69d30479561ed43b580facc600
Author: Stefan Richter <[email protected]>
Date: 2016-08-11T09:59:07Z
[FLINK-4381] Refactor State to Prepare For Key-Group State Backends
commit c038d6d9435c329ab4ca06c119cff5456924b5ab
Author: Till Rohrmann <[email protected]>
Date: 2016-08-11T10:14:18Z
[FLINK-4380] Add tests for new Key-Group/Max-Parallelism
This tests the rescaling features in CheckpointCoordinator and
SavepointCoordinator.
commit 751effb855a81e6322a7e897c98dc59ea065d072
Author: Aljoscha Krettek <[email protected]>
Date: 2016-08-12T09:07:09Z
Ignore RescalingITCase
----
> Introduce key groups for key-value state to support dynamic scaling
> -------------------------------------------------------------------
>
> Key: FLINK-3755
> URL: https://issues.apache.org/jira/browse/FLINK-3755
> Project: Flink
> Issue Type: New Feature
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> In order to support dynamic scaling, it is necessary to sub-partition the
> key-value states of each operator. This sub-partitioning, which produces a
> set of key groups, allows to easily scale in and out Flink jobs by simply
> reassigning the different key groups to the new set of sub tasks. The idea of
> key groups is described in this design document [1].
> [1]
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)