Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-221309557
  
    Thanks for the initial feedback @aljoscha :-)
    
    The introduction of `PartitionedState` is indeed not strictly necessary for 
this PR. The idea was that we will have partitioned and non-partitioned state 
in the future. `PartitionedState` is the key-value state backed by the 
`PartitionedStateBackend` whereas non-partitioned state is backed by the 
`AbstractStateBackend`. The first non-partitioned state (apart from the state 
serialized via `CheckpointStateOutputStream`) could be the redistributable 
non-partitioned state necessary for the `KafkaSources`, for example. Thus, the 
`PartitionedState` is more of a logical separation and it lays the foundation 
so that also non-keyed stream operators can use a proper state abstraction. But 
I can revert it, if you deem it redundant or pre-mature.
    
    It is true that the `PartitionedStateBackend` and the 
`KeyGroupStateBackend` have **almost** the same signature. However, the changes 
you've mentioned are imho crucial and made the whole refactoring of the state 
backends necessary in the first place. The difference is that the 
`KeyGroupStateBackend` is aware of the key groups and, consequently, is able to 
snapshot and restore each key group individually. Trying to work around this 
would mean that the `PartitionedStateBackend` always has a single key group 
associated. But for that, it would have to know the sub task index of the 
enclosing `StreamOperator` to assign a sensible key group index. Furthermore, 
it wouldn't make sense to use any other `PartitionedStateBackend` than the 
`KeyGroupStateBackend` (given that it respects the `KeyGroupAssigner`) for the 
`AbstractStreamOperator`, because the data is shuffled according to the key 
group assignments. In general, I think the notion of key groups are touching 
too many parts of the Fl
 ink runtime so that it makes no longer sense to try to unify the 
`KeyGroupStateBackends` and `PartitionedStateBackends`. The state backends used 
by the `AbstractStreamOperator` have to be aware of that notion.
    
    You can regard the `PartitionedStateBackend` as an internal class which was 
introduced to reuse the existing state backend implementations via the 
`GenericKeyGroupStateBackend`. In the future it might make sense to directly 
implement the `KeyGroupStateBackend` interface to decrease the key group 
overhead. It's just unfortunate that Java does not allow to specify package 
private methods. Otherwise, I would have declared the 
`createPartitionedStateBackend` as package private. But since the 
`GenericKeyGroupStateBackend` resides in a sub-package of 
`o.a.f.runtime.state`, it cannot access this method. But I think we could 
refactor it the following way: Remove `createPartitionedStateBackend`, make 
`createKeyGroupStateBackend` abstract, let the implementations of 
`AbstractStateBackend` implement a `PartitionedStateBackendFactory` interface 
and define the `createKeyGroupStateBackend` method for all 
`AbstractStateBackend` implementations with creating a 
`GenericKeyGroupStateBackend` which
  requires a `PartitionedStateBackendFactory`. That would be probably a better 
design.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to