[
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-4940:
------------------------------------
Description:
As mentioned in
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
we need broadcast state to support job patterns where one (or several) inputs
are broadcast to all operator instances and where we keep state that that is
mutated only based on input from broadcast inputs. This special restriction
ensures that the broadcast state is the same on all parallel operator instances
when checkpointing (except when using at-least-once mode). We therefore only
have to checkpoint the state of one arbitrary instance, for example instance 0.
For the different types of side inputs we need different types of state,
luckily, the side input types align with the state types we currently have for
keyed state:
- {{ValueState}}
- {{ListState}}
- {{MapState}}
We can therefore reuse keyed state backends for our purposes but need to put a
more restricting API in front of it: mutation of broadcast state must only be
allowed when actually processing broadcast input. If we don't have this check
users can (by mistake) modify broadcast state. This would lead to incorrect
results which are very hard to notice, much less debug.
With the way the Flink state API works (users can get a {{State}} in {{open()}}
and work with state by calling methods on that) we have to add special wrapping
state classes that only allow modification of state when processing a broadcast
element.
For the API, I propose to add a new interface `InternalStateAccessor`:
{code}
/**
* Interface for accessing persistent state.
*/
@PublicEvolving
public interface InternalStateAccessor {
<N, S extends State> S state(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor)}
{code}
this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to
abstract away the special nature of broadcast state.
{{AbstractStreamOperator}} would get a new method `getBroadcastStateAccessor()`
that returns an implementation of this interface. The implementation would have
a {{KeyedStateBackend}} but wrap the state in special wrappers that only allow
modification when processing broadcast elements (as mentioned above).
On the lower implementation levels, we have to add a new entry for our state to
`OperatorSnapshotResult`. For example:
{code}
private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
{code}
Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation logic
will have to be adapted to support this new kind of state. With the ongoing
changes in supporting incremental snapshotting and other new features for
`KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or
[[email protected]] and/or [~xiaogang.shi]. We also have to be very
careful about maintaining compatibility with savepoints from older versions.
was:
We need this to support the pattern where you have one input of a two-input
operator keyed and the other input carries messages that should be broadcast to
all operator instances. This other (side) input can contain configuration
updates, models, or what have you that are relevant for all elements in the
other stream.
We should add something akin to {{KeyedStateBackend}} where state is kept by
operator. When checkpointing, we only checkpoint the state of the first
operator instance because we assume the state to be the same across all
instances. When restoring we restore the one snapshot to all operator instances.
Current thinking is that this global/broadcast state backend should allow
access by key and possibly also iteration over all stored keys.
> Add Support for Broadcast State
> -------------------------------
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: Aljoscha Krettek
>
> As mentioned in
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> we need broadcast state to support job patterns where one (or several)
> inputs are broadcast to all operator instances and where we keep state that
> that is mutated only based on input from broadcast inputs. This special
> restriction ensures that the broadcast state is the same on all parallel
> operator instances when checkpointing (except when using at-least-once mode).
> We therefore only have to checkpoint the state of one arbitrary instance, for
> example instance 0.
> For the different types of side inputs we need different types of state,
> luckily, the side input types align with the state types we currently have
> for keyed state:
> - {{ValueState}}
> - {{ListState}}
> - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put
> a more restricting API in front of it: mutation of broadcast state must only
> be allowed when actually processing broadcast input. If we don't have this
> check users can (by mistake) modify broadcast state. This would lead to
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in
> {{open()}} and work with state by calling methods on that) we have to add
> special wrapping state classes that only allow modification of state when
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
> * Interface for accessing persistent state.
> */
> @PublicEvolving
> public interface InternalStateAccessor {
> <N, S extends State> S state(
> N namespace,
> TypeSerializer<N> namespaceSerializer,
> StateDescriptor<S, ?> stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us
> to abstract away the special nature of broadcast state.
> {{AbstractStreamOperator}} would get a new method
> `getBroadcastStateAccessor()` that returns an implementation of this
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the
> state in special wrappers that only allow modification when processing
> broadcast elements (as mentioned above).
> On the lower implementation levels, we have to add a new entry for our state
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation
> logic will have to be adapted to support this new kind of state. With the
> ongoing changes in supporting incremental snapshotting and other new features
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or
> [[email protected]] and/or [~xiaogang.shi]. We also have to be very
> careful about maintaining compatibility with savepoints from older versions.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)