[ 
https://issues.apache.org/jira/browse/FLINK-11094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11094:
----------------------------------------
    Description: 
This was caused by changes in FLINK-10679.

The problem is that in that change, in the {{RocksDBKeyedBackend}}, 
{{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all 
restored state, but instead only lazily created when the state was accessed 
again by the user. This causes non-accessed restored state to have empty meta 
info, and throws NPE when trying to take a snapshot of them.

The rationale behind FLINK-10679 was that, since 
{{RegisteredStateMetaInfoBase}} holds already serializer instances for state 
access, creating them eagerly at restore time with restored serializer 
snapshots did not make sense (because at that point-in-time, we do not have the 
new serializers yet for state access; the snapshot is only capable of creating 
the previous state serializer).

I propose the following:

Instead of having final {{TypeSerializer}} instances in subclasses of 
{{RegisteredStateMetaInfoBase}}, they should have a {{StateSerializerProvider}}.

The {{StateSerializerProvider}} would have the following methods:
{code:java}
public class StateSerializerProvider<T> {
    TypeSerializer<T> getCurrentSerializer();
    void updateCurrentSerializer(TypeSerializer<T> newSerializer);
    TypeSerializer<T> getPreviousSerializer();
}
{code}
A {{StateSerializerProvider}} can be created either from:
 1) A restored serializer snapshot when restoring the state.
 2) A fresh, new state's serializer, when registering the state for the first 
time.

For 1), state that has not been accessed yet after the restore will return the 
same serializer (i.e. the previous serializer) for both 
{{getPreviousSerializer}} and {{getCurrentSerializer}}.
 Once a restored state is re-accessed, then 
{{updateCurrentSerializer(TypeSerializer<T> newSerializer)}} should be used to 
update what serializer the provider returns in {{getCurrentSerializer}}.

We could also make use of this new abstraction to move away some of the new 
serializer's compatibility checks from the state backend to 
{{StateSerializerProvider#updateCurrentSerializer}}.

For tests, apparently we're lacking test coverage for restored state that has 
not been accessed and being snapshotted again. This should be included as part 
of the fix.

  was:
This was caused by changes in FLINK-10679.

The problem is that in that change, in the {{RocksDBKeyedBackend}}, 
\{{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all 
restored state, but instead only lazily created when the state was accessed 
again by the user. This causes non-accessed restored state to have empty meta 
info, and throws NPE when trying to take a snapshot of them.

The rationale behind FLINK-10679 was that, since 
{{RegisteredStateMetaInfoBase}} holds already serializer instances for state 
access, creating them eagerly at restore time with restored serializer 
snapshots did not make sense (because at that point-in-time, we do not have the 
new serializers yet for state access; the snapshot is only capable of creating 
the previous state serializer).

I propose the following:

Instead of having final {{TypeSerializer}} instances in subclasses of 
{{RegisteredStateMetaInfoBase}}, they should have a {{StateSerializerProvider}}.

The {{StateSerializerProvider}} would have the following methods:
{code:java}
public class StateSerializerProvider<T> {
    TypeSerializer<T> getCurrentSerializer();
    void updateCurrentSerializer(TypeSerializer<T> newSerializer);
    TypeSerializer<T> getPreviousSerializer();
}
{code}
A {{StateSerializerProvider}} can be created either from:
 1) A restored serializer snapshot when restoring the state.
 2) A fresh, new state's serializer, when registering the state for the first 
time.

For 1), state that has not been accessed yet after the restore will return the 
same serializer (i.e. the previous serializer) for both 
{{getPreviousSerializer}} and {{getCurrentSerializer}}.
 Once a restored state is re-accessed, then 
{{updateCurrentSerializer(TypeSerializer<T> newSerializer)}} should be used to 
update what serializer the provider returns in {{getCurrentSerializer}}.

We could also make use of this new abstraction to move away some of the new 
serializer's compatibility checks from the state backend to 
{{StateSerializerProvider#updateCurrentSerializer}}.

For tests, apparently we're lacking test coverage for restored state that has 
not been accessed and being snapshotted again. This should be included as part 
of the fix.


> Restored state in RocksDBStateBackend that has not been accessed in restored 
> execution causes NPE on snapshot
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11094
>                 URL: https://issues.apache.org/jira/browse/FLINK-11094
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.7.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.7.1
>
>
> This was caused by changes in FLINK-10679.
> The problem is that in that change, in the {{RocksDBKeyedBackend}}, 
> {{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all 
> restored state, but instead only lazily created when the state was accessed 
> again by the user. This causes non-accessed restored state to have empty meta 
> info, and throws NPE when trying to take a snapshot of them.
> The rationale behind FLINK-10679 was that, since 
> {{RegisteredStateMetaInfoBase}} holds already serializer instances for state 
> access, creating them eagerly at restore time with restored serializer 
> snapshots did not make sense (because at that point-in-time, we do not have 
> the new serializers yet for state access; the snapshot is only capable of 
> creating the previous state serializer).
> I propose the following:
> Instead of having final {{TypeSerializer}} instances in subclasses of 
> {{RegisteredStateMetaInfoBase}}, they should have a 
> {{StateSerializerProvider}}.
> The {{StateSerializerProvider}} would have the following methods:
> {code:java}
> public class StateSerializerProvider<T> {
>     TypeSerializer<T> getCurrentSerializer();
>     void updateCurrentSerializer(TypeSerializer<T> newSerializer);
>     TypeSerializer<T> getPreviousSerializer();
> }
> {code}
> A {{StateSerializerProvider}} can be created either from:
>  1) A restored serializer snapshot when restoring the state.
>  2) A fresh, new state's serializer, when registering the state for the first 
> time.
> For 1), state that has not been accessed yet after the restore will return 
> the same serializer (i.e. the previous serializer) for both 
> {{getPreviousSerializer}} and {{getCurrentSerializer}}.
>  Once a restored state is re-accessed, then 
> {{updateCurrentSerializer(TypeSerializer<T> newSerializer)}} should be used 
> to update what serializer the provider returns in {{getCurrentSerializer}}.
> We could also make use of this new abstraction to move away some of the new 
> serializer's compatibility checks from the state backend to 
> {{StateSerializerProvider#updateCurrentSerializer}}.
> For tests, apparently we're lacking test coverage for restored state that has 
> not been accessed and being snapshotted again. This should be included as 
> part of the fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to