[ 
https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542771#comment-16542771
 ] 

ASF GitHub Bot commented on FLINK-9376:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202293477
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
                return Tuple2.of(stateInfo.f0, newMetaInfo);
        }
     
    +   private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, 
SV> migrateStateIfNecessary(
    +                   StateDescriptor<S, SV> stateDesc,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +           @SuppressWarnings("unchecked")
    +           RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> 
restoredMetaInfoSnapshot =
    +                   (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) 
restoredKvStateMetaInfos.get(
    +                           stateDesc.getName());
    +
    +           Preconditions.checkState(
    +                   restoredMetaInfoSnapshot != null,
    +                   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
    +                           " but its corresponding restored snapshot 
cannot be found.");
    +
    +           StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
    +
    +           TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +           RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   stateDesc.getType(),
    +                   stateDesc.getName(),
    +                   namespaceSerializer,
    +                   stateSerializer);
    +
    +           // check compatibility results to determine if state migration 
is required
    +           TypeSerializerSchemaCompatibility<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +                   namespaceSerializer);
    +
    +           TypeSerializerSchemaCompatibility<SV> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +                   stateSerializer);
    +
    +           if (namespaceCompatibility.isIncompatible()) {
    +                   throw new UnsupportedOperationException(
    +                           "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
    +           }
    +
    +           if (stateCompatibility.isIncompatible()) {
    +                   if 
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
    +                           throw new UnsupportedOperationException(
    +                                   "Changing the TypeSerializers of a 
MapState in an incompatible way is currently not supported.");
    +                   }
    +
    +                   LOG.info(
    +                           "Performing state migration for state {} 
because the state serializer changed in an incompatible way.",
    +                           stateDesc);
    +
    +                   // we need to get an actual state instance because 
migration is different
    +                   // for different state types. For example, ListState 
needs to deal with
    +                   // individual elements
    +                   StateFactory stateFactory = 
STATE_FACTORIES.get(stateDesc.getClass());
    +                   if (stateFactory == null) {
    +                           String message = String.format("State %s is not 
supported by %s",
    +                                   stateDesc.getClass(), this.getClass());
    +                           throw new FlinkRuntimeException(message);
    +                   }
    +
    +                   State state = stateFactory.createState(
    +                           stateDesc,
    +                           Tuple2.of(stateInfo.f0, newMetaInfo),
    +                           RocksDBKeyedStateBackend.this);
    +
    +                   if (!(state instanceof AbstractRocksDBState)) {
    +                           throw new FlinkRuntimeException(
    +                                   "State should be an 
AbstractRocksDBState but is " + state);
    +                   }
    +
    +                   AbstractRocksDBState rocksDBState = 
(AbstractRocksDBState<?, N, ?, S>) state;
    +
    +                   Snapshot rocksDBSnapshot = null;
    +                   RocksIteratorWrapper iterator = null;
    +
    +                   try (ReadOptions readOptions = new ReadOptions();) {
    +                           // TODO: can I do this with try-with-resource 
or do I always have to call
    +                           // db.releaseSnapshot()
    +                           // I think I can't use try-with-resource 
anyways since I have to set the snapshot
    +                           // on the ReadOptions
    +
    +                           rocksDBSnapshot = db.getSnapshot();
    +                           readOptions.setSnapshot(rocksDBSnapshot);
    +
    +                           iterator = getRocksIterator(db, stateInfo.f0, 
readOptions);
    +                           iterator.seekToFirst();
    +
    +                           while (iterator.isValid()) {
    +
    +                                   byte[] serializedValue = 
iterator.value();
    +
    +                                   byte[] migratedSerializedValue = 
rocksDBState.migrateSerializedValue(
    +                                           serializedValue,
    +                                           
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot().restoreSerializer(),
    +                                           stateDesc.getSerializer());
    +
    +                                   db.put(stateInfo.f0, iterator.key(), 
migratedSerializedValue);
    +
    +                                   iterator.next();
    +                           }
    +                   } finally {
    +                           if (iterator != null) {
    +                                   iterator.close();
    +                           }
    +                           if (rocksDBSnapshot != null) {
    +                                   db.releaseSnapshot(rocksDBSnapshot);
    +                                   // TODO: do I need to call close() or 
is calling db.releaseSnapshot() enough
    +                                   rocksDBSnapshot.close();
    --- End diff --
    
    It is backed by a native objects, so always call `close` if the method is 
there


> Allow upgrading to incompatible state serializers (state schema evolution)
> --------------------------------------------------------------------------
>
>                 Key: FLINK-9376
>                 URL: https://issues.apache.org/jira/browse/FLINK-9376
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing, Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> Currently, users have access to upgrade state serializers on the restore run 
> of a stateful job, as long as the upgraded new serializer remains backwards 
> compatible with all previous written data in the savepoint (i.e. it can read 
> all previous and current schema of serialized state objects).
> What is still lacking is the ability to upgrade to incompatible serializers. 
> Upon being registered an incompatible serializer for existing restored state, 
> that state needs to go through the process of -
>  1. read serialized state with the previous serializer
>  2. passing each deserialized state object through a “migration map 
> function”, and
>  3. writing back the state with the new serializer
> The availability of this process should be strictly limited to state 
> registrations that occur before the actual processing begins (e.g. in the 
> {{open}} or {{initializeState}} methods), so that we avoid performing these 
> operations during processing.
> How this procedure actually occurs, differs across different types of state 
> backends.
> For example, for state backends that eagerly deserialize / lazily serialize 
> state (e.g. {{HeapStateBackend}}), the job execution itself can be seen as a 
> "migration"; everything is deserialized to state objects on restore, and is 
> only serialized again, with the new serializer, on checkpoints.
> Therefore, for these state backends, the above process is irrelevant.
> On the other hand, for state backends that lazily deserialize / eagerly 
> serialize state (e.g. {{RocksDBStateBackend}}), the state evolution process 
> needs to happen for every state with a newly registered incompatible 
> serializer.
> Procedure 2. will allow even state type migrations, but that is out-of-scope 
> of this JIRA.
>  This ticket focuses only on procedures 1. and 3., where we try to enable 
> schema evolution without state type changes.
> This is an umbrella JIRA ticket that overlooks this feature, including a few 
> preliminary tasks that work towards enabling it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to