[
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15449027#comment-15449027
]
ASF GitHub Bot commented on FLINK-3755:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2440
Keyed backend refactor
This pull request is a followup to the preliminary pull request #2376 and
addresses all issues subsumed under [FLINK-3755]
In addition to the changes from PR #2376, this PR adds:
# 1) Refactoring of Key Value State
Before, `AbstractStateBackend` was responsible both for checkpointing to
streams and
for keyed state. Now, this functionality is split up into
`CheckpointStreamFactory` and
`KeyedStateBackend`. The former is responsible for providing streams for
writing checkpoints
while the latter is only responsible for keeping keyed state. A
`KeyedStateBackend` can
write its content to a checkpoint. For this it uses a
`CheckpointStreamFactory`.
# 2) Introduction of key-group aware `KeyedStateBackend`s
## a) HeapKeyedStateBackend
`HeapKeyedStateBackend` subsumes the keyed state part of both
`MemStateBackend` and
`FsStateBackend` and `MemoryStateBackend`. The only difference between the
two now
is that one produces a `CheckpointStreamFactory` that produces streams for
writing to files
while the other provides streams that write to in-memory byte arrays.
Also, this introduces another layer of lookup in the
`HeapKeyedStateBackend` to accomodate
storing state per key group. Upon checkpointing the data is written out in
a format that
is very similar to the new RocksDB backend. We should make these 100 %
compatible as
a follow up.
## b) RocksDBKeyedStateBackend
The RocksDB state backend is now also key-group aware. This happens through
an additional
1-2 byte key-group prefix that is added to each key. On snapshots, the key
value states
for different key-groups are combined through `RocksDBMergeIterator`. All
snapshots from
this backend are now running fully asynchronous using an implimentation of
`AbstractAsyncIOCallable`.
# Refactoring of asynchrounous snapshot facilities
Snapshots are now driven by `AsyncCheckpointRunnable`s in `StreamTask`,
which are executed
through a threadpool. `AsyncCheckpointRunnable` is created with a
`RunnableFuture<KeyGroupsStateHandle>` that is obtained from
`KeyedStateBackend` through
```
public abstract RunnableFuture<KeyGroupsStateHandle> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory) throws Exception;
```
# Review comments on #2376
From the comments on this PR, which can be found under PR #2376, we
introduced the following changes:
## In comparison to PR #2376, we dropped the KeyGroupAssigner interface in
favor of static methods.
The reason for this is that the code relies on a consistent key group
assignment in several places.
## By default, the max parallelism is chosen as
```
Math.min(128 , roundToNextPowerOfTwo(parallelism + parallelism / 2))
```
## No blocking on the termination of async snapshot threads.
## Reduced logging.
# Limitiations
Currently, queryable state is not key-group aware and
`QueryableStateITCase` is ignored. This will
be solved in a folloup work.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink keyed-backend-refactor
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2440.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2440
----
commit 40484f3a66558b40bcf5bcaae3e3dba28d73f8dd
Author: Till Rohrmann <[email protected]>
Date: 2016-07-28T13:08:24Z
[FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
This introduces a new KeySelector that assigns keys to key groups and
also adds the max parallelism parameter throughout all API levels.
This also adds tests for the newly introduced features.
commit 3609f29076dd504ab9790d874fe3e3d4828f6b77
Author: Aljoscha Krettek <[email protected]>
Date: 2016-08-10T16:44:50Z
[FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware
The biggest change in this is that functionality that used to be in
AbstractStateBackend is now moved to CheckpointStreamFactory and
KeyedStateBackend. The former is responsible for providing streams that
can be used to checkpoint data while the latter is responsible for
keeping keyed state. A keyed backend can checkpoint the state that it
keeps by using a CheckpointStreamFactory.
This also refactors how asynchronous keyed state snapshots work. They
are not implemented using a Future/RunnableFuture.
Also, this changes the keyed state backends to be key-group aware and to
snapshot the state in key-groups with an index for restoring.
commit 9d675ca0707b31923108ea78908b63fc46798c97
Author: Stefan Richter <[email protected]>
Date: 2016-08-11T09:59:07Z
[FLINK-4381] Refactor State to Prepare For Key-Group State Backends
commit d99b75c70bb15fe6ee5c06968d92b075e9b6c772
Author: Till Rohrmann <[email protected]>
Date: 2016-08-11T10:14:18Z
[FLINK-4380] Add tests for new Key-Group/Max-Parallelism
This tests the rescaling features in CheckpointCoordinator and
SavepointCoordinator.
commit 1d514b8d5a8db663e1be293b9653c42d45787e36
Author: Stefan Richter <[email protected]>
Date: 2016-08-17T12:50:18Z
[FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware
This change makes the RocksDB backend key-group aware by building on the
changes in the previous commit.
commit 4f791d17f727a2fead12224652747b73d98ffaa1
Author: Aljoscha Krettek <[email protected]>
Date: 2016-08-25T12:09:12Z
Ignore QueryableStateITCase
This doesn't work yet because the state query machinery is not yet
properly aware of key-grouped state.
commit f47af43bd7b5e54086ab2ce87b9471cb99a38421
Author: Stefan Richter <[email protected]>
Date: 2016-08-29T08:02:31Z
Introduced timout of thread pool for testing. Removed legacy code path from
HashKeyGroupAssigner
commit 52061346e39cbc591e79bad2b6a4d9ce272bb558
Author: Stefan Richter <[email protected]>
Date: 2016-08-29T09:53:22Z
Stephan's feedback: remove KeyGroupAssigner in favor of a static method and
have default max. parallelism at 128
commit d77475a69dfdfae092107562af5c96af8de370cb
Author: Stefan Richter <[email protected]>
Date: 2016-08-29T14:10:15Z
Improved test stability
commit 1252a246bee6bc181b2def83966d121b1ca5688e
Author: Stefan Richter <[email protected]>
Date: 2016-08-30T10:26:48Z
Improved HeapKeyedStateBackend for more compact snapshots.
commit 3f282619c6dbf48a246bf848be2e92a4cad8bd2d
Author: Stefan Richter <[email protected]>
Date: 2016-08-30T11:45:40Z
Expose max parallelism through StreamExecutionEnvironment
commit 1fa0e02f33722029159b39625127758fcb3623d3
Author: Stefan Richter <[email protected]>
Date: 2016-08-30T12:47:34Z
test fix
commit 257992bf3ead38d12775b077478b84f8690c7fb9
Author: Stefan Richter <[email protected]>
Date: 2016-08-30T12:59:12Z
Extended EventTimeWindowCheckpointITCase to test the boundaries of
maxParallelism.
commit ea26c0f2e9f1687b5ae89a1acaeaea681c19bd80
Author: Stefan Richter <[email protected]>
Date: 2016-08-30T13:27:25Z
Reset WindowCheckpointingITCase to (sometimes) failing version.
----
> Introduce key groups for key-value state to support dynamic scaling
> -------------------------------------------------------------------
>
> Key: FLINK-3755
> URL: https://issues.apache.org/jira/browse/FLINK-3755
> Project: Flink
> Issue Type: New Feature
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> In order to support dynamic scaling, it is necessary to sub-partition the
> key-value states of each operator. This sub-partitioning, which produces a
> set of key groups, allows to easily scale in and out Flink jobs by simply
> reassigning the different key groups to the new set of sub tasks. The idea of
> key groups is described in this design document [1].
> [1]
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)