Tzu-Li (Gordon) Tai created FLINK-11094:
-------------------------------------------
Summary: Restored state in RocksDBStateBackend that has not been
accessed in new execution causes NPE on snapshot
Key: FLINK-11094
URL: https://issues.apache.org/jira/browse/FLINK-11094
Project: Flink
Issue Type: Bug
Components: State Backends, Checkpointing
Affects Versions: 1.7.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Fix For: 1.7.1
This was caused by changes in FLINK-10679.
The problem is that in that change, in the {{RocksDBKeyedBackend}},
{{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all
restored state, but instead only lazily created when the state was accessed
again by the user. This causes non-accessed restored state to have empty meta
info, and throws NPE when trying to take a snapshot of them.
The rationale behind FLINK-10679 was that, since
{{RegisteredStateMetaInfoBase}} holds already serializer instances for state
access, creating them eagerly at restore time with restored serializer
snapshots did not make sense (because at that point-in-time, we do not have the
new serializers yet for state access; the snapshot is only capable of creating
the previous state serializer).
I propose the following:
Instead of having final {{TypeSerializer}} instances in
{{RegisteredStateMetaInfoBase}}s, they should have a
{{StateSerializerProvider}} instead.
The {{StateSerializerProvider}} would have the following methods:
{code}
public class StateSerializerProvider<T> {
TypeSerializer<T> getCurrentSerializer();
TypeSerializer<T> updateCurrentSerializer(TypeSerializer<T> newSerializer);
TypeSerializer<T> getPreviousSerializer();
}
{code}
A {{StateSerializerProvider}} can be created either from:
1) A restored serializer snapshot when restoring the state.
2) A fresh, new state's serializer, when registering the state for the first
time.
For 1), state that has not been accessed after the restore will return the same
serializer (i.e. the previous serializer) for both {{getPreviousSerializer}}
and {{getCurrentSerializer}}. Once a restored state is re-accessed, then
{{updateCurrentSerializer(TypeSerializer<T> newSerializer)}} should be used to
update what serializer the provider returns in {{getCurrentSerializer}}.
We could also make use of this new abstraction to move away some of the new
serializer's compatibility checks from the state backend to
{{StateSerializerProvider#updateCurrentSerializer}}.
For tests, apparently we're lacking test coverage for restored state that has
not been accessed and being snapshotted again. This should be included as part
of the fix.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)