GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/1988
[FLINK-3761] Introduction of key groups
This pull request introduces the concept of key groups to Flink. A key
group is the smallest assignable unit of key-value state to a stream operator.
Differently said, it is a sub set of the key space which is assigned to a
stream operator. With the introduction of key groups, it will be possible to
dynamically scale Flink operators which use partitioned (=key-value) state.
In order to support key groups, the following components were added/changed:
## Introduction of KeyGroupStateBackend
In order to make the state backends aware of the key groups, we had to
introduce a new type of state backend `KeyGroupStateBackend`. A key group state
backend behaves similarly to the old `AbstractStateBackend` just with the
difference that it is aware of key groups. This means that it generates a
snapshot for each key group upon snapshotting.
In order to leverage the existing implementation of state backends, the
`AbstractStateBackend` was split up into the new `AbstractStateBackend` which
is responsible for managing non-partitioned state and the
`PartitionedStateBackend` which is responsible for managing the partitioned
state.
Furthermore, the PR comes with a `GenericKeyGroupStateBackend`
implementation which is the standard for the `KeyGroupStateBackend`. The
`GenericKeyGroupStateBackend` simply creates for every key group a
`PartitionedStateBackend` which manages this key group. Upon snapshotting, each
`PartitionedStateBackend` is snapshot.
## Introduction of KeyGroupAssigner
In order to assign keys to a key group, the `KeyGroupAssigner` interface is
introduced. The `KeyGroupAssigner` implementation is used by the
`GenericKeyGroupStateBackend` to select the proper `PartitionedStateBackend`
for the current key. Furthermore, the former `HashPartitioner`, which is
renamed now into `KeyGroupStreamPartitioner` uses the `KeyGroupAssigner` to
distribute the streaming records in a consistent way wrt to the key group
mappings. The key groups itself are mapped in a round robin fashion (key group
index modulo #channels = out-going channel) to the downstream tasks.
## Introduction of MaxParallelism to user API
In order to scale programs up or down, it is necessary to define the
maximum number of key groups. The maximum number of key groups denotes the
maximum parallelism of an operator, because every operator needs at least one
key group to get elements assigned. Thus, in order to specify this upper limit,
the `ExecutionConfig` allows to set a job-wide max parallelism value via
`ExecutionConfig.setMaxParallelism`. In addition to that the
`SingleOutputStreamOperator` allows to set similarly to the parallelism a max
parallelism value on an operator basis. If the max parallelism has not been set
and there is no job-wide max parallelism set, the parallelism of the operator
will be taken as the max parallelism. Thus, every operator would then receive a
single key group.
However, in order to scale jobs up and down, one has to set a max
parallelism for the operators which shall be scaled up/down. The upper limit
for the scaling is the max parallelism value. The max parallelism mustn't
change when scaling the job, because it would destroy the mapping of keys to
key groups.
- [X] General
- The pull request references the related JIRA issue ("[FLINK-XXX] Jira
title text")
- The pull request addresses only one issue
- Each commit in the PR has a meaningful commit message (including the
JIRA id)
- [X] Documentation
- Documentation has been added for new functionality
- Old documentation affected by the pull request has been updated
- JavaDoc for public methods has been added
- [X] Tests & Build
- Functionality added by the pull request is covered by tests
- `mvn clean verify` has been executed successfully locally or a Travis
build has passed
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink keyGroups2
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1988.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1988
----
commit ea0ca6ac53bd8f34fb8d6b1684f6db14a35ca1f0
Author: Till Rohrmann <[email protected]>
Date: 2016-03-14T08:41:08Z
Introduce KeyGroupAssigner, KeyGroupStateBackend and KeyGroupKVState
commit 4bfdece4c9f823da1ad6fe8a894b09fbd0e8101f
Author: Till Rohrmann <[email protected]>
Date: 2016-04-07T12:05:34Z
Add key group state handles to AcknowledgeCheckpoint message
commit 19aa0d62e60a9a148b5b8a145e5e95099d149799
Author: Till Rohrmann <[email protected]>
Date: 2016-04-07T12:06:41Z
Rename StreamTaskState -> StreamOperatorState and StreamTaskStateList ->
StreamTaskState
commit af2dfb01fc4951cf3ef548a210de657702f2f506
Author: Till Rohrmann <[email protected]>
Date: 2016-04-07T14:48:49Z
Introduce KeyGroupState to StreamTask, StreamOperator and StateBackend
commit 32e705555f737a6b9971946bf2564309e42d4ea3
Author: Till Rohrmann <[email protected]>
Date: 2016-04-15T10:05:44Z
Introduce KeyGroupStateBackend
commit 16d8111260dba5edfb507dfb06b71c5ea500c7ba
Author: Till Rohrmann <[email protected]>
Date: 2016-04-28T16:50:12Z
Adapt test cases to work with refactored state backends
commit 088b5f9b37a7950125640b05e85231c70e501423
Author: Till Rohrmann <[email protected]>
Date: 2016-04-28T16:50:40Z
Refactor RocksDbStateBackend into PartitionedRocksDbStateBackend and
RocksDbStateBackend
commit f29d9a64181d2d2af59a4172d9a492d7d50e1c69
Author: Till Rohrmann <[email protected]>
Date: 2016-05-02T08:51:10Z
Introduce partitioned and non-partitioned stream operator state
commit 2dd69e32b2ee0523bd18383f4c10072598158ba7
Author: Till Rohrmann <[email protected]>
Date: 2016-05-03T16:57:21Z
Forward key group state to TDD
commit 45501c55fe76d5536668c1c4a1ca0bfd62ffa606
Author: Till Rohrmann <[email protected]>
Date: 2016-05-03T17:30:12Z
Remove state descriptor generic parameter from KvState and KvStateSnapshot
commit f52ae87f831b62a7c596359fa779485e0231fee7
Author: Till Rohrmann <[email protected]>
Date: 2016-05-03T17:43:20Z
Remove state generic parameter from KvState and KvStateSnapshot
commit 76e3db5a358857e43f8e03abdae28c476a68fc50
Author: Till Rohrmann <[email protected]>
Date: 2016-05-03T17:44:49Z
Remove unused state descriptor from RocksDBReducingState
commit 6b8ad068b5d53e841e94debd7376313ce47c0a08
Author: Till Rohrmann <[email protected]>
Date: 2016-05-04T14:42:01Z
Send key group state size to checkpoint coordinator
commit 62cf90df1da9a8724eb24713227318f4557c4915
Author: Till Rohrmann <[email protected]>
Date: 2016-05-04T14:52:11Z
Add sanity check for number of key groups
commit 1a45cbcdbecd90d35457269fff6ddaa7e3889902
Author: Till Rohrmann <[email protected]>
Date: 2016-05-04T18:41:27Z
Add max parallelism to job vertex and execution job vertex
commit acfee09eb66a35f61dd7e2d6912259de6f70e8e4
Author: Till Rohrmann <[email protected]>
Date: 2016-05-06T12:11:52Z
Add setMaxParallelism to SingleOutputStreamOperator
If max parallelism is not set, then the parallelism is chosen as the max
parallelism.
commit 567ccb5fbd3cb9cb12acbc812ebda08b20cf71cf
Author: Till Rohrmann <[email protected]>
Date: 2016-05-06T13:29:47Z
Add documentation
commit 2af688f7e2e5f99260755e90c1424e910cbd19d6
Author: Till Rohrmann <[email protected]>
Date: 2016-05-06T15:35:40Z
Add setMaxParallelism to Scala API
Set parallelism in StreamGraphGenerator if no default parallelism has been
set
commit b7b302382718531f7395f0c06e426f616a8e5146
Author: Till Rohrmann <[email protected]>
Date: 2016-05-09T12:37:09Z
Remove disposeAllStateForCurrentJob from AbstractStateBackend
commit c8dacbab2022ec216174ddf9dd5e2bddfcf7be11
Author: Till Rohrmann <[email protected]>
Date: 2016-05-10T19:57:01Z
Add RescalingITCase
commit 7218af747db9f098b07f8375539340348a4b2146
Author: Till Rohrmann <[email protected]>
Date: 2016-05-11T07:53:52Z
Rework CheckpointCoordinator to support rescaling
commit c81d88e2908a1f47cfa15b472e5efc871606ccc5
Author: Till Rohrmann <[email protected]>
Date: 2016-05-11T08:21:10Z
Test rescaling job with non-partitioned state
commit 7f05674406175c9ba6b98378aa21efee35a2c746
Author: Till Rohrmann <[email protected]>
Date: 2016-05-11T08:33:53Z
Introduce
SavepointCoordinatorDeActivator/CheckpointCoordinatorDeActivatorCreator to
resolve warning
commit c0eca81a8f1ac4167b0b3c6a325256456d45652c
Author: Till Rohrmann <[email protected]>
Date: 2016-05-11T08:57:22Z
Add tests for HashKeyGroupAssigner
commit e6a4122342f11894e99f93c964919853fa5b04f2
Author: Till Rohrmann <[email protected]>
Date: 2016-05-11T11:48:16Z
Add GenericKeyGroupStateBackendTest
commit 4455f9861a90d4d99e1a2fe963e89cbe47b264d5
Author: Till Rohrmann <[email protected]>
Date: 2016-05-11T15:52:17Z
Add tests for configuring the KeyGroupStreamPartitioner
commit c83297936d4795dd8d6bc84654122b4f0cdc64ae
Author: Till Rohrmann <[email protected]>
Date: 2016-05-11T16:13:59Z
Add StreamingJobGraphGeneratorTest
Add test cases for connected streams
commit 5519ff559b9846196389b04bafeeb886266c0881
Author: Till Rohrmann <[email protected]>
Date: 2016-05-12T09:14:13Z
Set proper access modifiers for CheckpointCoordinator
commit 6a36ee70f159ffdd69e81fcd202ba2fdbb743b3d
Author: Till Rohrmann <[email protected]>
Date: 2016-05-12T09:59:54Z
Replace polling from StreamTaskTest
commit 9f2d01aaa03ebdbad3d1e62d164dd406ca364517
Author: Till Rohrmann <[email protected]>
Date: 2016-05-12T17:12:24Z
Test checkpointing and recovery of StreamTask
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---