[ 
https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542770#comment-16542770
 ] 

ASF GitHub Bot commented on FLINK-9376:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202289229
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
                return Tuple2.of(stateInfo.f0, newMetaInfo);
        }
     
    +   private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, 
SV> migrateStateIfNecessary(
    +                   StateDescriptor<S, SV> stateDesc,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +           @SuppressWarnings("unchecked")
    +           RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> 
restoredMetaInfoSnapshot =
    +                   (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) 
restoredKvStateMetaInfos.get(
    +                           stateDesc.getName());
    +
    +           Preconditions.checkState(
    +                   restoredMetaInfoSnapshot != null,
    +                   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
    +                           " but its corresponding restored snapshot 
cannot be found.");
    +
    +           StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
    +
    +           TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +           RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   stateDesc.getType(),
    +                   stateDesc.getName(),
    +                   namespaceSerializer,
    +                   stateSerializer);
    +
    +           // check compatibility results to determine if state migration 
is required
    +           TypeSerializerSchemaCompatibility<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +                   namespaceSerializer);
    +
    +           TypeSerializerSchemaCompatibility<SV> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +                   stateSerializer);
    +
    +           if (namespaceCompatibility.isIncompatible()) {
    +                   throw new UnsupportedOperationException(
    +                           "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
    +           }
    +
    +           if (stateCompatibility.isIncompatible()) {
    --- End diff --
    
    The handling of this branch could maybe go to it's own private method to 
break down this big monolithic method a bit.


> Allow upgrading to incompatible state serializers (state schema evolution)
> --------------------------------------------------------------------------
>
>                 Key: FLINK-9376
>                 URL: https://issues.apache.org/jira/browse/FLINK-9376
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing, Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> Currently, users have access to upgrade state serializers on the restore run 
> of a stateful job, as long as the upgraded new serializer remains backwards 
> compatible with all previous written data in the savepoint (i.e. it can read 
> all previous and current schema of serialized state objects).
> What is still lacking is the ability to upgrade to incompatible serializers. 
> Upon being registered an incompatible serializer for existing restored state, 
> that state needs to go through the process of -
>  1. read serialized state with the previous serializer
>  2. passing each deserialized state object through a “migration map 
> function”, and
>  3. writing back the state with the new serializer
> The availability of this process should be strictly limited to state 
> registrations that occur before the actual processing begins (e.g. in the 
> {{open}} or {{initializeState}} methods), so that we avoid performing these 
> operations during processing.
> How this procedure actually occurs, differs across different types of state 
> backends.
> For example, for state backends that eagerly deserialize / lazily serialize 
> state (e.g. {{HeapStateBackend}}), the job execution itself can be seen as a 
> "migration"; everything is deserialized to state objects on restore, and is 
> only serialized again, with the new serializer, on checkpoints.
> Therefore, for these state backends, the above process is irrelevant.
> On the other hand, for state backends that lazily deserialize / eagerly 
> serialize state (e.g. {{RocksDBStateBackend}}), the state evolution process 
> needs to happen for every state with a newly registered incompatible 
> serializer.
> Procedure 2. will allow even state type migrations, but that is out-of-scope 
> of this JIRA.
>  This ticket focuses only on procedures 1. and 3., where we try to enable 
> schema evolution without state type changes.
> This is an umbrella JIRA ticket that overlooks this feature, including a few 
> preliminary tasks that work towards enabling it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to