GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1988

    [FLINK-3761] Introduction of key groups

    This pull request introduces the concept of key groups to Flink. A key 
group is the smallest assignable unit of key-value state to a stream operator. 
Differently said, it is a sub set of the key space which is assigned to a 
stream operator. With the introduction of key groups, it will be possible to 
dynamically scale Flink operators which use partitioned (=key-value) state.
    
    In order to support key groups, the following components were added/changed:
    
    ## Introduction of KeyGroupStateBackend
    
    In order to make the state backends aware of the key groups, we had to 
introduce a new type of state backend `KeyGroupStateBackend`. A key group state 
backend behaves similarly to the old `AbstractStateBackend` just with the 
difference that it is aware of key groups. This means that it generates a 
snapshot for each key group upon snapshotting.
    
    In order to leverage the existing implementation of state backends, the 
`AbstractStateBackend` was split up into the new `AbstractStateBackend` which 
is responsible for managing non-partitioned state and the 
`PartitionedStateBackend` which is responsible for managing the partitioned 
state.
    
    Furthermore, the PR comes with a `GenericKeyGroupStateBackend` 
implementation which is the standard for the `KeyGroupStateBackend`. The 
`GenericKeyGroupStateBackend` simply creates for every key group a 
`PartitionedStateBackend` which manages this key group. Upon snapshotting, each 
`PartitionedStateBackend` is snapshot.
    
    ## Introduction of KeyGroupAssigner
    
    In order to assign keys to a key group, the `KeyGroupAssigner` interface is 
introduced. The `KeyGroupAssigner` implementation is used by the 
`GenericKeyGroupStateBackend` to select the proper `PartitionedStateBackend` 
for the current key. Furthermore, the former `HashPartitioner`, which is 
renamed now into `KeyGroupStreamPartitioner` uses the `KeyGroupAssigner` to 
distribute the streaming records in a consistent way wrt to the key group 
mappings. The key groups itself are mapped in a round robin fashion (key group 
index modulo #channels = out-going channel) to the downstream tasks.  
    
    ## Introduction of MaxParallelism to user API
    
    In order to scale programs up or down, it is necessary to define the 
maximum number of key groups. The maximum number of key groups denotes the 
maximum parallelism of an operator, because every operator needs at least one 
key group to get elements assigned. Thus, in order to specify this upper limit, 
the `ExecutionConfig` allows to set a job-wide max parallelism value via 
`ExecutionConfig.setMaxParallelism`. In addition to that the 
`SingleOutputStreamOperator` allows to set similarly to the parallelism a max 
parallelism value on an operator basis. If the max parallelism has not been set 
and there is no job-wide max parallelism set, the parallelism of the operator 
will be taken as the max parallelism. Thus, every operator would then receive a 
single key group.
    
    However, in order to scale jobs up and down, one has to set a max 
parallelism for the operators which shall be scaled up/down. The upper limit 
for the scaling is the max parallelism value. The max parallelism mustn't 
change when scaling the job, because it would destroy the mapping of keys to 
key groups. 
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the 
JIRA id)
    
    - [X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink keyGroups2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1988.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1988
    
----
commit ea0ca6ac53bd8f34fb8d6b1684f6db14a35ca1f0
Author: Till Rohrmann <[email protected]>
Date:   2016-03-14T08:41:08Z

    Introduce KeyGroupAssigner, KeyGroupStateBackend and KeyGroupKVState

commit 4bfdece4c9f823da1ad6fe8a894b09fbd0e8101f
Author: Till Rohrmann <[email protected]>
Date:   2016-04-07T12:05:34Z

    Add key group state handles to AcknowledgeCheckpoint message

commit 19aa0d62e60a9a148b5b8a145e5e95099d149799
Author: Till Rohrmann <[email protected]>
Date:   2016-04-07T12:06:41Z

    Rename StreamTaskState -> StreamOperatorState and StreamTaskStateList -> 
StreamTaskState

commit af2dfb01fc4951cf3ef548a210de657702f2f506
Author: Till Rohrmann <[email protected]>
Date:   2016-04-07T14:48:49Z

    Introduce KeyGroupState to StreamTask, StreamOperator and StateBackend

commit 32e705555f737a6b9971946bf2564309e42d4ea3
Author: Till Rohrmann <[email protected]>
Date:   2016-04-15T10:05:44Z

    Introduce KeyGroupStateBackend

commit 16d8111260dba5edfb507dfb06b71c5ea500c7ba
Author: Till Rohrmann <[email protected]>
Date:   2016-04-28T16:50:12Z

    Adapt test cases to work with refactored state backends

commit 088b5f9b37a7950125640b05e85231c70e501423
Author: Till Rohrmann <[email protected]>
Date:   2016-04-28T16:50:40Z

    Refactor RocksDbStateBackend into PartitionedRocksDbStateBackend and 
RocksDbStateBackend

commit f29d9a64181d2d2af59a4172d9a492d7d50e1c69
Author: Till Rohrmann <[email protected]>
Date:   2016-05-02T08:51:10Z

    Introduce partitioned and non-partitioned stream operator state

commit 2dd69e32b2ee0523bd18383f4c10072598158ba7
Author: Till Rohrmann <[email protected]>
Date:   2016-05-03T16:57:21Z

    Forward key group state to TDD

commit 45501c55fe76d5536668c1c4a1ca0bfd62ffa606
Author: Till Rohrmann <[email protected]>
Date:   2016-05-03T17:30:12Z

    Remove state descriptor generic parameter from KvState and KvStateSnapshot

commit f52ae87f831b62a7c596359fa779485e0231fee7
Author: Till Rohrmann <[email protected]>
Date:   2016-05-03T17:43:20Z

    Remove state generic parameter from KvState and KvStateSnapshot

commit 76e3db5a358857e43f8e03abdae28c476a68fc50
Author: Till Rohrmann <[email protected]>
Date:   2016-05-03T17:44:49Z

    Remove unused state descriptor from RocksDBReducingState

commit 6b8ad068b5d53e841e94debd7376313ce47c0a08
Author: Till Rohrmann <[email protected]>
Date:   2016-05-04T14:42:01Z

    Send key group state size to checkpoint coordinator

commit 62cf90df1da9a8724eb24713227318f4557c4915
Author: Till Rohrmann <[email protected]>
Date:   2016-05-04T14:52:11Z

    Add sanity check for number of key groups

commit 1a45cbcdbecd90d35457269fff6ddaa7e3889902
Author: Till Rohrmann <[email protected]>
Date:   2016-05-04T18:41:27Z

    Add max parallelism to job vertex and execution job vertex

commit acfee09eb66a35f61dd7e2d6912259de6f70e8e4
Author: Till Rohrmann <[email protected]>
Date:   2016-05-06T12:11:52Z

    Add setMaxParallelism to SingleOutputStreamOperator
    
    If max parallelism is not set, then the parallelism is chosen as the max
    parallelism.

commit 567ccb5fbd3cb9cb12acbc812ebda08b20cf71cf
Author: Till Rohrmann <[email protected]>
Date:   2016-05-06T13:29:47Z

    Add documentation

commit 2af688f7e2e5f99260755e90c1424e910cbd19d6
Author: Till Rohrmann <[email protected]>
Date:   2016-05-06T15:35:40Z

    Add setMaxParallelism to Scala API
    
    Set parallelism in StreamGraphGenerator if no default parallelism has been 
set

commit b7b302382718531f7395f0c06e426f616a8e5146
Author: Till Rohrmann <[email protected]>
Date:   2016-05-09T12:37:09Z

    Remove disposeAllStateForCurrentJob from AbstractStateBackend

commit c8dacbab2022ec216174ddf9dd5e2bddfcf7be11
Author: Till Rohrmann <[email protected]>
Date:   2016-05-10T19:57:01Z

    Add RescalingITCase

commit 7218af747db9f098b07f8375539340348a4b2146
Author: Till Rohrmann <[email protected]>
Date:   2016-05-11T07:53:52Z

    Rework CheckpointCoordinator to support rescaling

commit c81d88e2908a1f47cfa15b472e5efc871606ccc5
Author: Till Rohrmann <[email protected]>
Date:   2016-05-11T08:21:10Z

    Test rescaling job with non-partitioned state

commit 7f05674406175c9ba6b98378aa21efee35a2c746
Author: Till Rohrmann <[email protected]>
Date:   2016-05-11T08:33:53Z

    Introduce 
SavepointCoordinatorDeActivator/CheckpointCoordinatorDeActivatorCreator to 
resolve warning

commit c0eca81a8f1ac4167b0b3c6a325256456d45652c
Author: Till Rohrmann <[email protected]>
Date:   2016-05-11T08:57:22Z

    Add tests for HashKeyGroupAssigner

commit e6a4122342f11894e99f93c964919853fa5b04f2
Author: Till Rohrmann <[email protected]>
Date:   2016-05-11T11:48:16Z

    Add GenericKeyGroupStateBackendTest

commit 4455f9861a90d4d99e1a2fe963e89cbe47b264d5
Author: Till Rohrmann <[email protected]>
Date:   2016-05-11T15:52:17Z

    Add tests for configuring the KeyGroupStreamPartitioner

commit c83297936d4795dd8d6bc84654122b4f0cdc64ae
Author: Till Rohrmann <[email protected]>
Date:   2016-05-11T16:13:59Z

    Add StreamingJobGraphGeneratorTest
    
    Add test cases for connected streams

commit 5519ff559b9846196389b04bafeeb886266c0881
Author: Till Rohrmann <[email protected]>
Date:   2016-05-12T09:14:13Z

    Set proper access modifiers for CheckpointCoordinator

commit 6a36ee70f159ffdd69e81fcd202ba2fdbb743b3d
Author: Till Rohrmann <[email protected]>
Date:   2016-05-12T09:59:54Z

    Replace polling from StreamTaskTest

commit 9f2d01aaa03ebdbad3d1e62d164dd406ca364517
Author: Till Rohrmann <[email protected]>
Date:   2016-05-12T17:12:24Z

    Test checkpointing and recovery of StreamTask

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to