Github user tillrohrmann commented on the pull request:
https://github.com/apache/flink/pull/1988#issuecomment-221309557
Thanks for the initial feedback @aljoscha :-)
The introduction of `PartitionedState` is indeed not strictly necessary for
this PR. The idea was that we will have partitioned and non-partitioned state
in the future. `PartitionedState` is the key-value state backed by the
`PartitionedStateBackend` whereas non-partitioned state is backed by the
`AbstractStateBackend`. The first non-partitioned state (apart from the state
serialized via `CheckpointStateOutputStream`) could be the redistributable
non-partitioned state necessary for the `KafkaSources`, for example. Thus, the
`PartitionedState` is more of a logical separation and it lays the foundation
so that also non-keyed stream operators can use a proper state abstraction. But
I can revert it, if you deem it redundant or pre-mature.
It is true that the `PartitionedStateBackend` and the
`KeyGroupStateBackend` have **almost** the same signature. However, the changes
you've mentioned are imho crucial and made the whole refactoring of the state
backends necessary in the first place. The difference is that the
`KeyGroupStateBackend` is aware of the key groups and, consequently, is able to
snapshot and restore each key group individually. Trying to work around this
would mean that the `PartitionedStateBackend` always has a single key group
associated. But for that, it would have to know the sub task index of the
enclosing `StreamOperator` to assign a sensible key group index. Furthermore,
it wouldn't make sense to use any other `PartitionedStateBackend` than the
`KeyGroupStateBackend` (given that it respects the `KeyGroupAssigner`) for the
`AbstractStreamOperator`, because the data is shuffled according to the key
group assignments. In general, I think the notion of key groups are touching
too many parts of the Fl
ink runtime so that it makes no longer sense to try to unify the
`KeyGroupStateBackends` and `PartitionedStateBackends`. The state backends used
by the `AbstractStreamOperator` have to be aware of that notion.
You can regard the `PartitionedStateBackend` as an internal class which was
introduced to reuse the existing state backend implementations via the
`GenericKeyGroupStateBackend`. In the future it might make sense to directly
implement the `KeyGroupStateBackend` interface to decrease the key group
overhead. It's just unfortunate that Java does not allow to specify package
private methods. Otherwise, I would have declared the
`createPartitionedStateBackend` as package private. But since the
`GenericKeyGroupStateBackend` resides in a sub-package of
`o.a.f.runtime.state`, it cannot access this method. But I think we could
refactor it the following way: Remove `createPartitionedStateBackend`, make
`createKeyGroupStateBackend` abstract, let the implementations of
`AbstractStateBackend` implement a `PartitionedStateBackendFactory` interface
and define the `createKeyGroupStateBackend` method for all
`AbstractStateBackend` implementations with creating a
`GenericKeyGroupStateBackend` which
requires a `PartitionedStateBackendFactory`. That would be probably a better
design.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---