[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15298355#comment-15298355
 ] 

ASF GitHub Bot commented on FLINK-3761:
---------------------------------------

Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-221309557
  
    Thanks for the initial feedback @aljoscha :-)
    
    The introduction of `PartitionedState` is indeed not strictly necessary for 
this PR. The idea was that we will have partitioned and non-partitioned state 
in the future. `PartitionedState` is the key-value state backed by the 
`PartitionedStateBackend` whereas non-partitioned state is backed by the 
`AbstractStateBackend`. The first non-partitioned state (apart from the state 
serialized via `CheckpointStateOutputStream`) could be the redistributable 
non-partitioned state necessary for the `KafkaSources`, for example. Thus, the 
`PartitionedState` is more of a logical separation and it lays the foundation 
so that also non-keyed stream operators can use a proper state abstraction. But 
I can revert it, if you deem it redundant or pre-mature.
    
    It is true that the `PartitionedStateBackend` and the 
`KeyGroupStateBackend` have **almost** the same signature. However, the changes 
you've mentioned are imho crucial and made the whole refactoring of the state 
backends necessary in the first place. The difference is that the 
`KeyGroupStateBackend` is aware of the key groups and, consequently, is able to 
snapshot and restore each key group individually. Trying to work around this 
would mean that the `PartitionedStateBackend` always has a single key group 
associated. But for that, it would have to know the sub task index of the 
enclosing `StreamOperator` to assign a sensible key group index. Furthermore, 
it wouldn't make sense to use any other `PartitionedStateBackend` than the 
`KeyGroupStateBackend` (given that it respects the `KeyGroupAssigner`) for the 
`AbstractStreamOperator`, because the data is shuffled according to the key 
group assignments. In general, I think the notion of key groups are touching 
too many parts of the Flink runtime so that it makes no longer sense to try to 
unify the `KeyGroupStateBackends` and `PartitionedStateBackends`. The state 
backends used by the `AbstractStreamOperator` have to be aware of that notion.
    
    You can regard the `PartitionedStateBackend` as an internal class which was 
introduced to reuse the existing state backend implementations via the 
`GenericKeyGroupStateBackend`. In the future it might make sense to directly 
implement the `KeyGroupStateBackend` interface to decrease the key group 
overhead. It's just unfortunate that Java does not allow to specify package 
private methods. Otherwise, I would have declared the 
`createPartitionedStateBackend` as package private. But since the 
`GenericKeyGroupStateBackend` resides in a sub-package of 
`o.a.f.runtime.state`, it cannot access this method. But I think we could 
refactor it the following way: Remove `createPartitionedStateBackend`, make 
`createKeyGroupStateBackend` abstract, let the implementations of 
`AbstractStateBackend` implement a `PartitionedStateBackendFactory` interface 
and define the `createKeyGroupStateBackend` method for all 
`AbstractStateBackend` implementations with creating a 
`GenericKeyGroupStateBackend` which requires a 
`PartitionedStateBackendFactory`. That would be probably a better design.


> Introduce key group state backend
> ---------------------------------
>
>                 Key: FLINK-3761
>                 URL: https://issues.apache.org/jira/browse/FLINK-3761
>             Project: Flink
>          Issue Type: Sub-task
>          Components: state backends
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to