[ 
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4940:
------------------------------------
    Description: 
As mentioned in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 we need broadcast state to support job patterns where one (or several) inputs 
are broadcast to all operator instances and where we keep state that that is 
mutated only based on input from broadcast inputs. This special restriction 
ensures that the broadcast state is the same on all parallel operator instances 
when checkpointing (except when using at-least-once mode). We therefore only 
have to checkpoint the state of one arbitrary instance, for example instance 0.

For the different types of side inputs we need different types of state, 
luckily, the side input types align with the state types we currently have for 
keyed state:
 - {{ValueState}}
 - {{ListState}}
 - {{MapState}}

We can therefore reuse keyed state backends for our purposes but need to put a 
more restricting API in front of it: mutation of broadcast state must only be 
allowed when actually processing broadcast input. If we don't have this check 
users can (by mistake) modify broadcast state. This would lead to incorrect 
results which are very hard to notice, much less debug.

With the way the Flink state API works (users can get a {{State}} in {{open()}} 
and work with state by calling methods on that) we have to add special wrapping 
state classes that only allow modification of state when processing a broadcast 
element.

For the API, I propose to add a new interface `InternalStateAccessor`:
{code}

/**
 * Interface for accessing persistent state.
 */
@PublicEvolving
public interface InternalStateAccessor {
  <N, S extends State> S state(
                        N namespace,
                        TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<S, ?> stateDescriptor)}
{code}
this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to 
abstract away the special nature of broadcast state.

{{AbstractStreamOperator}} would get a new method `getBroadcastStateAccessor()` 
that returns an implementation of this interface. The implementation would have 
a {{KeyedStateBackend}} but wrap the state in special wrappers that only allow 
modification when processing broadcast elements (as mentioned above). 

On the lower implementation levels, we have to add a new entry for our state to 
`OperatorSnapshotResult`. For example:
{code}
private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
{code}
Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation logic 
will have to be adapted to support this new kind of state. With the ongoing 
changes in supporting incremental snapshotting and other new features for 
`KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
[[email protected]] and/or [~xiaogang.shi]. We also have to be very 
careful about maintaining compatibility with savepoints from older versions.

  was:
We need this to support the pattern where you have one input of a two-input 
operator keyed and the other input carries messages that should be broadcast to 
all operator instances. This other (side) input can contain configuration 
updates, models, or what have you that are relevant for all elements in the 
other stream.

We should add something akin to {{KeyedStateBackend}} where state is kept by 
operator. When checkpointing, we only checkpoint the state of the first 
operator instance because we assume the state to be the same across all 
instances. When restoring we restore the one snapshot to all operator instances.

Current thinking is that this global/broadcast state backend should allow 
access by key and possibly also iteration over all stored keys.


> Add Support for Broadcast State
> -------------------------------
>
>                 Key: FLINK-4940
>                 URL: https://issues.apache.org/jira/browse/FLINK-4940
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>
> As mentioned in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  we need broadcast state to support job patterns where one (or several) 
> inputs are broadcast to all operator instances and where we keep state that 
> that is mutated only based on input from broadcast inputs. This special 
> restriction ensures that the broadcast state is the same on all parallel 
> operator instances when checkpointing (except when using at-least-once mode). 
> We therefore only have to checkpoint the state of one arbitrary instance, for 
> example instance 0.
> For the different types of side inputs we need different types of state, 
> luckily, the side input types align with the state types we currently have 
> for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put 
> a more restricting API in front of it: mutation of broadcast state must only 
> be allowed when actually processing broadcast input. If we don't have this 
> check users can (by mistake) modify broadcast state. This would lead to 
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in 
> {{open()}} and work with state by calling methods on that) we have to add 
> special wrapping state classes that only allow modification of state when 
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>   <N, S extends State> S state(
>                       N namespace,
>                       TypeSerializer<N> namespaceSerializer,
>                       StateDescriptor<S, ?> stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us 
> to abstract away the special nature of broadcast state.
> {{AbstractStreamOperator}} would get a new method 
> `getBroadcastStateAccessor()` that returns an implementation of this 
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the 
> state in special wrappers that only allow modification when processing 
> broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state 
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation 
> logic will have to be adapted to support this new kind of state. With the 
> ongoing changes in supporting incremental snapshotting and other new features 
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
> [[email protected]] and/or [~xiaogang.shi]. We also have to be very 
> careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to