[
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15298355#comment-15298355
]
ASF GitHub Bot commented on FLINK-3761:
---------------------------------------
Github user tillrohrmann commented on the pull request:
https://github.com/apache/flink/pull/1988#issuecomment-221309557
Thanks for the initial feedback @aljoscha :-)
The introduction of `PartitionedState` is indeed not strictly necessary for
this PR. The idea was that we will have partitioned and non-partitioned state
in the future. `PartitionedState` is the key-value state backed by the
`PartitionedStateBackend` whereas non-partitioned state is backed by the
`AbstractStateBackend`. The first non-partitioned state (apart from the state
serialized via `CheckpointStateOutputStream`) could be the redistributable
non-partitioned state necessary for the `KafkaSources`, for example. Thus, the
`PartitionedState` is more of a logical separation and it lays the foundation
so that also non-keyed stream operators can use a proper state abstraction. But
I can revert it, if you deem it redundant or pre-mature.
It is true that the `PartitionedStateBackend` and the
`KeyGroupStateBackend` have **almost** the same signature. However, the changes
you've mentioned are imho crucial and made the whole refactoring of the state
backends necessary in the first place. The difference is that the
`KeyGroupStateBackend` is aware of the key groups and, consequently, is able to
snapshot and restore each key group individually. Trying to work around this
would mean that the `PartitionedStateBackend` always has a single key group
associated. But for that, it would have to know the sub task index of the
enclosing `StreamOperator` to assign a sensible key group index. Furthermore,
it wouldn't make sense to use any other `PartitionedStateBackend` than the
`KeyGroupStateBackend` (given that it respects the `KeyGroupAssigner`) for the
`AbstractStreamOperator`, because the data is shuffled according to the key
group assignments. In general, I think the notion of key groups are touching
too many parts of the Flink runtime so that it makes no longer sense to try to
unify the `KeyGroupStateBackends` and `PartitionedStateBackends`. The state
backends used by the `AbstractStreamOperator` have to be aware of that notion.
You can regard the `PartitionedStateBackend` as an internal class which was
introduced to reuse the existing state backend implementations via the
`GenericKeyGroupStateBackend`. In the future it might make sense to directly
implement the `KeyGroupStateBackend` interface to decrease the key group
overhead. It's just unfortunate that Java does not allow to specify package
private methods. Otherwise, I would have declared the
`createPartitionedStateBackend` as package private. But since the
`GenericKeyGroupStateBackend` resides in a sub-package of
`o.a.f.runtime.state`, it cannot access this method. But I think we could
refactor it the following way: Remove `createPartitionedStateBackend`, make
`createKeyGroupStateBackend` abstract, let the implementations of
`AbstractStateBackend` implement a `PartitionedStateBackendFactory` interface
and define the `createKeyGroupStateBackend` method for all
`AbstractStateBackend` implementations with creating a
`GenericKeyGroupStateBackend` which requires a
`PartitionedStateBackendFactory`. That would be probably a better design.
> Introduce key group state backend
> ---------------------------------
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
> Issue Type: Sub-task
> Components: state backends
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that
> it would be beneficial to reflect the differences between a keyed and a
> non-keyed stream also in the state backends. A state backend which is used
> for a keyed stream offers a value, list, folding and value state and has to
> group its keys into key groups.
> A state backend for non-keyed streams can only offer a union state to make it
> work with dynamic scaling. A union state is a state which is broadcasted to
> all tasks in case of a recovery. The state backends can then select what
> information they need to recover from the whole state (formerly distributed).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)