[
https://issues.apache.org/jira/browse/FLINK-11094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai updated FLINK-11094:
----------------------------------------
Summary: Restored state in RocksDBStateBackend that has not been accessed
in restored execution causes NPE on snapshot (was: Restored state in
RocksDBStateBackend that has not been accessed in new execution causes NPE on
snapshot)
> Restored state in RocksDBStateBackend that has not been accessed in restored
> execution causes NPE on snapshot
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-11094
> URL: https://issues.apache.org/jira/browse/FLINK-11094
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.7.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.7.1
>
>
> This was caused by changes in FLINK-10679.
> The problem is that in that change, in the {{RocksDBKeyedBackend}},
> {{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all
> restored state, but instead only lazily created when the state was accessed
> again by the user. This causes non-accessed restored state to have empty meta
> info, and throws NPE when trying to take a snapshot of them.
> The rationale behind FLINK-10679 was that, since
> {{RegisteredStateMetaInfoBase}} holds already serializer instances for state
> access, creating them eagerly at restore time with restored serializer
> snapshots did not make sense (because at that point-in-time, we do not have
> the new serializers yet for state access; the snapshot is only capable of
> creating the previous state serializer).
> I propose the following:
> Instead of having final {{TypeSerializer}} instances in
> {{RegisteredStateMetaInfoBase}}s, they should have a
> {{StateSerializerProvider}} instead.
> The {{StateSerializerProvider}} would have the following methods:
> {code}
> public class StateSerializerProvider<T> {
> TypeSerializer<T> getCurrentSerializer();
> TypeSerializer<T> updateCurrentSerializer(TypeSerializer<T>
> newSerializer);
> TypeSerializer<T> getPreviousSerializer();
> }
> {code}
> A {{StateSerializerProvider}} can be created either from:
> 1) A restored serializer snapshot when restoring the state.
> 2) A fresh, new state's serializer, when registering the state for the first
> time.
> For 1), state that has not been accessed after the restore will return the
> same serializer (i.e. the previous serializer) for both
> {{getPreviousSerializer}} and {{getCurrentSerializer}}. Once a restored state
> is re-accessed, then {{updateCurrentSerializer(TypeSerializer<T>
> newSerializer)}} should be used to update what serializer the provider
> returns in {{getCurrentSerializer}}.
> We could also make use of this new abstraction to move away some of the new
> serializer's compatibility checks from the state backend to
> {{StateSerializerProvider#updateCurrentSerializer}}.
> For tests, apparently we're lacking test coverage for restored state that has
> not been accessed and being snapshotted again. This should be included as
> part of the fix.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)