[
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-4940:
------------------------------------
Description:
As mentioned in
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
we need broadcast state to support job patterns where one (or several) inputs
are broadcast to all operator instances and where we keep state that that is
mutated only based on input from broadcast inputs. This special restriction
ensures that the broadcast state is the same on all parallel operator instances
when checkpointing (except when using at-least-once mode). We therefore only
have to checkpoint the state of one arbitrary instance, for example instance 0.
For the different types of side inputs we need different types of state,
luckily, the side input types align with the state types we currently have for
keyed state:
- {{ValueState}}
- {{ListState}}
- {{MapState}}
We can therefore reuse keyed state backends for our purposes but need to put a
more restricting API in front of it: mutation of broadcast state must only be
allowed when actually processing broadcast input. If we don't have this check
users can (by mistake) modify broadcast state. This would lead to incorrect
results which are very hard to notice, much less debug.
With the way the Flink state API works (users can get a {{State}} in {{open()}}
and work with state by calling methods on that) we have to add special wrapping
state classes that only allow modification of state when processing a broadcast
element.
For the API, I propose to add a new interface `InternalStateAccessor`:
{code}
/**
* Interface for accessing persistent state.
*/
@PublicEvolving
public interface InternalStateAccessor {
<N, S extends State> S state(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor)}
{code}
this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to
abstract away the special nature of broadcast state. This is also meant as an
external interface and is not to be exposed to user functions. Only operators
should deal with this.
{{AbstractStreamOperator}} would get a new method `getBroadcastStateAccessor()`
that returns an implementation of this interface. The implementation would have
a {{KeyedStateBackend}} but wrap the state in special wrappers that only allow
modification when processing broadcast elements (as mentioned above).
On the lower implementation levels, we have to add a new entry for our state to
`OperatorSnapshotResult`. For example:
{code}
private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
{code}
Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation logic
will have to be adapted to support this new kind of state. With the ongoing
changes in supporting incremental snapshotting and other new features for
`KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or
[[email protected]] and/or [~xiaogang.shi]. We also have to be very
careful about maintaining compatibility with savepoints from older versions.
was:
As mentioned in
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
we need broadcast state to support job patterns where one (or several) inputs
are broadcast to all operator instances and where we keep state that that is
mutated only based on input from broadcast inputs. This special restriction
ensures that the broadcast state is the same on all parallel operator instances
when checkpointing (except when using at-least-once mode). We therefore only
have to checkpoint the state of one arbitrary instance, for example instance 0.
For the different types of side inputs we need different types of state,
luckily, the side input types align with the state types we currently have for
keyed state:
- {{ValueState}}
- {{ListState}}
- {{MapState}}
We can therefore reuse keyed state backends for our purposes but need to put a
more restricting API in front of it: mutation of broadcast state must only be
allowed when actually processing broadcast input. If we don't have this check
users can (by mistake) modify broadcast state. This would lead to incorrect
results which are very hard to notice, much less debug.
With the way the Flink state API works (users can get a {{State}} in {{open()}}
and work with state by calling methods on that) we have to add special wrapping
state classes that only allow modification of state when processing a broadcast
element.
For the API, I propose to add a new interface `InternalStateAccessor`:
{code}
/**
* Interface for accessing persistent state.
*/
@PublicEvolving
public interface InternalStateAccessor {
<N, S extends State> S state(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor)}
{code}
this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to
abstract away the special nature of broadcast state.
{{AbstractStreamOperator}} would get a new method `getBroadcastStateAccessor()`
that returns an implementation of this interface. The implementation would have
a {{KeyedStateBackend}} but wrap the state in special wrappers that only allow
modification when processing broadcast elements (as mentioned above).
On the lower implementation levels, we have to add a new entry for our state to
`OperatorSnapshotResult`. For example:
{code}
private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
{code}
Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation logic
will have to be adapted to support this new kind of state. With the ongoing
changes in supporting incremental snapshotting and other new features for
`KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or
[[email protected]] and/or [~xiaogang.shi]. We also have to be very
careful about maintaining compatibility with savepoints from older versions.
> Add Support for Broadcast State
> -------------------------------
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: Aljoscha Krettek
>
> As mentioned in
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> we need broadcast state to support job patterns where one (or several)
> inputs are broadcast to all operator instances and where we keep state that
> that is mutated only based on input from broadcast inputs. This special
> restriction ensures that the broadcast state is the same on all parallel
> operator instances when checkpointing (except when using at-least-once mode).
> We therefore only have to checkpoint the state of one arbitrary instance, for
> example instance 0.
> For the different types of side inputs we need different types of state,
> luckily, the side input types align with the state types we currently have
> for keyed state:
> - {{ValueState}}
> - {{ListState}}
> - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put
> a more restricting API in front of it: mutation of broadcast state must only
> be allowed when actually processing broadcast input. If we don't have this
> check users can (by mistake) modify broadcast state. This would lead to
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in
> {{open()}} and work with state by calling methods on that) we have to add
> special wrapping state classes that only allow modification of state when
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
> * Interface for accessing persistent state.
> */
> @PublicEvolving
> public interface InternalStateAccessor {
> <N, S extends State> S state(
> N namespace,
> TypeSerializer<N> namespaceSerializer,
> StateDescriptor<S, ?> stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us
> to abstract away the special nature of broadcast state. This is also meant as
> an external interface and is not to be exposed to user functions. Only
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method
> `getBroadcastStateAccessor()` that returns an implementation of this
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the
> state in special wrappers that only allow modification when processing
> broadcast elements (as mentioned above).
> On the lower implementation levels, we have to add a new entry for our state
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation
> logic will have to be adapted to support this new kind of state. With the
> ongoing changes in supporting incremental snapshotting and other new features
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or
> [[email protected]] and/or [~xiaogang.shi]. We also have to be very
> careful about maintaining compatibility with savepoints from older versions.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)