[
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-4940:
------------------------------------
Summary: Add support for broadcast state (was: Add Support for Broadcast
State)
> Add support for broadcast state
> -------------------------------
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: Aljoscha Krettek
>
> As mentioned in
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> we need broadcast state to support job patterns where one (or several)
> inputs are broadcast to all operator instances and where we keep state that
> that is mutated only based on input from broadcast inputs. This special
> restriction ensures that the broadcast state is the same on all parallel
> operator instances when checkpointing (except when using at-least-once mode).
> We therefore only have to checkpoint the state of one arbitrary instance, for
> example instance 0.
> For the different types of side inputs we need different types of state,
> luckily, the side input types align with these state types we currently have
> for keyed state:
> - {{ValueState}}
> - {{ListState}}
> - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put
> a more restricting API in front of it: mutation of broadcast state must only
> be allowed when actually processing broadcast input. If we don't have this
> check users can (by mistake) modify broadcast state. This would lead to
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in
> {{open()}} and work with state by calling methods on that) we have to add
> special wrapping state classes that only allow modification of state when
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
> * Interface for accessing persistent state.
> */
> @PublicEvolving
> public interface InternalStateAccessor {
> <N, S extends State> S state(
> N namespace,
> TypeSerializer<N> namespaceSerializer,
> StateDescriptor<S, ?> stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us
> to abstract away the special nature of broadcast state. This is also meant as
> an external interface and is not to be exposed to user functions. Only
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method
> `getBroadcastStateAccessor()` that returns an implementation of this
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the
> state in special wrappers that only allow modification when processing
> broadcast elements (as mentioned above).
> On the lower implementation levels, we have to add a new entry for our state
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation
> logic will have to be adapted to support this new kind of state. With the
> ongoing changes in supporting incremental snapshotting and other new features
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or
> [[email protected]] and/or [~xiaogang.shi]. We also have to be very
> careful about maintaining compatibility with savepoints from older versions.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)