[ 
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-4940:
-------------------------------------

    Assignee: Kostas Kloudas

> Add support for broadcast state
> -------------------------------
>
>                 Key: FLINK-4940
>                 URL: https://issues.apache.org/jira/browse/FLINK-4940
>             Project: Flink
>          Issue Type: Sub-task
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>            Priority: Major
>
> As mentioned in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  we need broadcast state to support job patterns where one (or several) 
> inputs are broadcast to all operator instances and where we keep state that 
> that is mutated only based on input from broadcast inputs. This special 
> restriction ensures that the broadcast state is the same on all parallel 
> operator instances when checkpointing (except when using at-least-once mode). 
> We therefore only have to checkpoint the state of one arbitrary instance, for 
> example instance 0.
> For the different types of side inputs we need different types of state, 
> luckily, the side input types align with these state types we currently have 
> for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put 
> a more restricting API in front of it: mutation of broadcast state must only 
> be allowed when actually processing broadcast input. If we don't have this 
> check users can (by mistake) modify broadcast state. This would lead to 
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in 
> {{open()}} and work with state by calling methods on that) we have to add 
> special wrapping state classes that only allow modification of state when 
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>   <N, S extends State> S state(
>                       N namespace,
>                       TypeSerializer<N> namespaceSerializer,
>                       StateDescriptor<S, ?> stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us 
> to abstract away the special nature of broadcast state. This is also meant as 
> an external interface and is not to be exposed to user functions. Only 
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method 
> `getBroadcastStateAccessor()` that returns an implementation of this 
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the 
> state in special wrappers that only allow modification when processing 
> broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state 
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation 
> logic will have to be adapted to support this new kind of state. With the 
> ongoing changes in supporting incremental snapshotting and other new features 
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
> [~stefanrichte...@gmail.com] and/or [~xiaogang.shi]. We also have to be very 
> careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to