asfgit closed pull request #7264: [FLINK-11094] [rocksdb] Make non-accessed 
restored state in RocksDBStateBackend snapshottable
URL: https://github.com/apache/flink/pull/7264
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 4702919384f..952dffbc70f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -26,7 +26,6 @@
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -105,23 +104,10 @@
         */
        private final boolean asynchronousSnapshots;
 
-       /**
-        * Map of state names to their corresponding restored state meta info.
-        *
-        * <p>TODO this map can be removed when eager-state registration is in 
place.
-        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
-        */
-       private final Map<String, StateMetaInfoSnapshot> 
restoredOperatorStateMetaInfos;
-
-       /**
-        * Map of state names to their corresponding restored broadcast state 
meta info.
-        */
-       private final Map<String, StateMetaInfoSnapshot> 
restoredBroadcastStateMetaInfos;
-
        /**
         * Cache of already accessed states.
         *
-        * <p>In contrast to {@link #registeredOperatorStates} and {@link 
#restoredOperatorStateMetaInfos} which may be repopulated
+        * <p>In contrast to {@link #registeredOperatorStates} which may be 
repopulated
         * with restored state, this map is always empty at the beginning.
         *
         * <p>TODO this map should be moved to a base class once we have proper 
hierarchy for the operator state backends.
@@ -148,8 +134,6 @@ public DefaultOperatorStateBackend(
                this.asynchronousSnapshots = asynchronousSnapshots;
                this.accessedStatesByName = new HashMap<>();
                this.accessedBroadcastStatesByName = new HashMap<>();
-               this.restoredOperatorStateMetaInfos = new HashMap<>();
-               this.restoredBroadcastStateMetaInfos = new HashMap<>();
                this.snapshotStrategy = new 
DefaultOperatorStateBackendSnapshotStrategy();
        }
 
@@ -226,34 +210,22 @@ public void dispose() {
                                        
broadcastState.getStateMetaInfo().getAssignmentMode(),
                                        OperatorStateHandle.Mode.BROADCAST);
 
-                       final StateMetaInfoSnapshot metaInfoSnapshot = 
restoredBroadcastStateMetaInfos.get(name);
+                       RegisteredBroadcastStateBackendMetaInfo<K, V> 
restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
 
                        // check whether new serializers are incompatible
-                       TypeSerializerSnapshot<K> keySerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<K>) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
-
                        TypeSerializerSchemaCompatibility<K> keyCompatibility =
-                               
keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer);
+                               
restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
                        if (keyCompatibility.isIncompatible()) {
                                throw new StateMigrationException("The new key 
serializer for broadcast state must not be incompatible.");
                        }
 
-                       TypeSerializerSnapshot<V> valueSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<V>) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-
                        TypeSerializerSchemaCompatibility<V> valueCompatibility 
=
-                               
valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer);
+                               
restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
                        if (valueCompatibility.isIncompatible()) {
                                throw new StateMigrationException("The new 
value serializer for broadcast state must not be incompatible.");
                        }
 
-                       // new serializer is compatible; use it to replace the 
old serializer
-                       broadcastState.setStateMetaInfo(
-                                       new 
RegisteredBroadcastStateBackendMetaInfo<>(
-                                                       name,
-                                                       
OperatorStateHandle.Mode.BROADCAST,
-                                                       
broadcastStateKeySerializer,
-                                                       
broadcastStateValueSerializer));
+                       
broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
                }
 
                accessedBroadcastStatesByName.put(name, broadcastState);
@@ -345,8 +317,6 @@ public void restore(Collection<OperatorStateHandle> 
restoreSnapshots) throws Exc
                                                        " not be loaded. This 
is a temporary restriction that will be fixed in future versions.");
                                        }
 
-                                       
restoredOperatorStateMetaInfos.put(restoredSnapshot.getName(), 
restoredSnapshot);
-
                                        PartitionableListState<?> listState = 
registeredOperatorStates.get(restoredSnapshot.getName());
 
                                        if (null == listState) {
@@ -381,8 +351,6 @@ public void restore(Collection<OperatorStateHandle> 
restoreSnapshots) throws Exc
                                                                " not be 
loaded. This is a temporary restriction that will be fixed in future 
versions.");
                                        }
 
-                                       
restoredBroadcastStateMetaInfos.put(restoredSnapshot.getName(), 
restoredSnapshot);
-
                                        BackendWritableBroadcastState<? ,?> 
broadcastState = registeredBroadcastStates.get(restoredSnapshot.getName());
 
                                        if (broadcastState == null) {
@@ -590,25 +558,19 @@ public void addAll(List<S> values) {
                                        
partitionableListState.getStateMetaInfo().getAssignmentMode(),
                                        mode);
 
-                       StateMetaInfoSnapshot restoredSnapshot = 
restoredOperatorStateMetaInfos.get(name);
-                       RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
-                               new 
RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
+                       RegisteredOperatorStateBackendMetaInfo<S> 
restoredPartitionableListStateMetaInfo =
+                               partitionableListState.getStateMetaInfo();
 
-                       // check compatibility to determine if state migration 
is required
+                       // check compatibility to determine if new serializers 
are incompatible
                        TypeSerializer<S> newPartitionStateSerializer = 
partitionStateSerializer.duplicate();
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<S> stateSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<S>) 
restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-
                        TypeSerializerSchemaCompatibility<S> stateCompatibility 
=
-                               
stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
+                               
restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
                        if (stateCompatibility.isIncompatible()) {
                                throw new StateMigrationException("The new 
state serializer for operator state must not be incompatible.");
                        }
 
-                       partitionableListState.setStateMetaInfo(
-                               new 
RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, 
mode));
+                       
partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
                }
 
                accessedStatesByName.put(name, partitionableListState);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
index 70a14142474..ecc13faa43d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,11 +40,11 @@
 
        /** The type serializer for the keys in the map state. */
        @Nonnull
-       private final TypeSerializer<K> keySerializer;
+       private final StateSerializerProvider<K> keySerializerProvider;
 
        /** The type serializer for the values in the map state. */
        @Nonnull
-       private final TypeSerializer<V> valueSerializer;
+       private final StateSerializerProvider<V> valueSerializerProvider;
 
        public RegisteredBroadcastStateBackendMetaInfo(
                        @Nonnull final String name,
@@ -50,19 +52,19 @@ public RegisteredBroadcastStateBackendMetaInfo(
                        @Nonnull final TypeSerializer<K> keySerializer,
                        @Nonnull final TypeSerializer<V> valueSerializer) {
 
-               super(name);
-               Preconditions.checkArgument(assignmentMode == 
OperatorStateHandle.Mode.BROADCAST);
-               this.assignmentMode = assignmentMode;
-               this.keySerializer = keySerializer;
-               this.valueSerializer = valueSerializer;
+               this(
+                       name,
+                       assignmentMode,
+                       StateSerializerProvider.fromNewState(keySerializer),
+                       StateSerializerProvider.fromNewState(valueSerializer));
        }
 
        public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
                this(
                        Preconditions.checkNotNull(copy).name,
                        copy.assignmentMode,
-                       copy.keySerializer.duplicate(),
-                       copy.valueSerializer.duplicate());
+                       copy.getKeySerializer().duplicate(),
+                       copy.getValueSerializer().duplicate());
        }
 
        @SuppressWarnings("unchecked")
@@ -71,10 +73,13 @@ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot sn
                        snapshot.getName(),
                        OperatorStateHandle.Mode.valueOf(
                                
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-                       (TypeSerializer<K>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
-                       (TypeSerializer<V>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<K>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<V>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == 
snapshot.getBackendStateType());
        }
 
@@ -86,6 +91,19 @@ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot sn
                return new RegisteredBroadcastStateBackendMetaInfo<>(this);
        }
 
+       private RegisteredBroadcastStateBackendMetaInfo(
+               @Nonnull final String name,
+               @Nonnull final OperatorStateHandle.Mode assignmentMode,
+               @Nonnull final StateSerializerProvider<K> keySerializerProvider,
+               @Nonnull final StateSerializerProvider<V> 
valueSerializerProvider) {
+
+               super(name);
+               Preconditions.checkArgument(assignmentMode == 
OperatorStateHandle.Mode.BROADCAST);
+               this.assignmentMode = assignmentMode;
+               this.keySerializerProvider = keySerializerProvider;
+               this.valueSerializerProvider = valueSerializerProvider;
+       }
+
        @Nonnull
        @Override
        public StateMetaInfoSnapshot snapshot() {
@@ -94,12 +112,32 @@ public StateMetaInfoSnapshot snapshot() {
 
        @Nonnull
        public TypeSerializer<K> getKeySerializer() {
-               return keySerializer;
+               return keySerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<K> 
updateKeySerializer(TypeSerializer<K> newKeySerializer) {
+               return 
keySerializerProvider.registerNewSerializerForRestoredState(newKeySerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<K> getPreviousKeySerializer() {
+               return keySerializerProvider.previousSchemaSerializer();
        }
 
        @Nonnull
        public TypeSerializer<V> getValueSerializer() {
-               return valueSerializer;
+               return valueSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<V> 
updateValueSerializer(TypeSerializer<V> newValueSerializer) {
+               return 
valueSerializerProvider.registerNewSerializerForRestoredState(newValueSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<V> getPreviousValueSerializer() {
+               return valueSerializerProvider.previousSchemaSerializer();
        }
 
        @Nonnull
@@ -122,16 +160,16 @@ public boolean equals(Object obj) {
 
                return Objects.equals(name, other.getName())
                                && Objects.equals(assignmentMode, 
other.getAssignmentMode())
-                               && Objects.equals(keySerializer, 
other.getKeySerializer())
-                               && Objects.equals(valueSerializer, 
other.getValueSerializer());
+                               && Objects.equals(getKeySerializer(), 
other.getKeySerializer())
+                               && Objects.equals(getValueSerializer(), 
other.getValueSerializer());
        }
 
        @Override
        public int hashCode() {
                int result = name.hashCode();
                result = 31 * result + assignmentMode.hashCode();
-               result = 31 * result + keySerializer.hashCode();
-               result = 31 * result + valueSerializer.hashCode();
+               result = 31 * result + getKeySerializer().hashCode();
+               result = 31 * result + getValueSerializer().hashCode();
                return result;
        }
 
@@ -139,8 +177,8 @@ public int hashCode() {
        public String toString() {
                return "RegisteredBroadcastBackendStateMetaInfo{" +
                                "name='" + name + '\'' +
-                               ", keySerializer=" + keySerializer +
-                               ", valueSerializer=" + valueSerializer +
+                               ", keySerializer=" + getKeySerializer() +
+                               ", valueSerializer=" + getValueSerializer() +
                                ", assignmentMode=" + assignmentMode +
                                '}';
        }
@@ -154,8 +192,12 @@ private StateMetaInfoSnapshot computeSnapshot() {
                Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshotsMap = new HashMap<>(2);
                String keySerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
                String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+
+               TypeSerializer<K> keySerializer = getKeySerializer();
                serializerMap.put(keySerializerKey, keySerializer.duplicate());
                serializerConfigSnapshotsMap.put(keySerializerKey, 
keySerializer.snapshotConfiguration());
+
+               TypeSerializer<V> valueSerializer = getValueSerializer();
                serializerMap.put(valueSerializerKey, 
valueSerializer.duplicate());
                serializerConfigSnapshotsMap.put(valueSerializerKey, 
valueSerializer.snapshotConfiguration());
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index d05f31a0c5c..b37c79de026 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
@@ -44,18 +45,24 @@
        @Nonnull
        private final StateDescriptor.Type stateType;
        @Nonnull
-       private final TypeSerializer<N> namespaceSerializer;
+       private final StateSerializerProvider<N> namespaceSerializerProvider;
        @Nonnull
-       private final TypeSerializer<S> stateSerializer;
+       private final StateSerializerProvider<S> stateSerializerProvider;
        @Nullable
-       private final StateSnapshotTransformer<S> snapshotTransformer;
+       private StateSnapshotTransformer<S> snapshotTransformer;
 
        public RegisteredKeyValueStateBackendMetaInfo(
                @Nonnull StateDescriptor.Type stateType,
                @Nonnull String name,
                @Nonnull TypeSerializer<N> namespaceSerializer,
                @Nonnull TypeSerializer<S> stateSerializer) {
-               this(stateType, name, namespaceSerializer, stateSerializer, 
null);
+
+               this(
+                       stateType,
+                       name,
+                       
StateSerializerProvider.fromNewState(namespaceSerializer),
+                       StateSerializerProvider.fromNewState(stateSerializer),
+                       null);
        }
 
        public RegisteredKeyValueStateBackendMetaInfo(
@@ -65,11 +72,12 @@ public RegisteredKeyValueStateBackendMetaInfo(
                @Nonnull TypeSerializer<S> stateSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) {
 
-               super(name);
-               this.stateType = stateType;
-               this.namespaceSerializer = namespaceSerializer;
-               this.stateSerializer = stateSerializer;
-               this.snapshotTransformer = snapshotTransformer;
+               this(
+                       stateType,
+                       name,
+                       
StateSerializerProvider.fromNewState(namespaceSerializer),
+                       StateSerializerProvider.fromNewState(stateSerializer),
+                       snapshotTransformer);
        }
 
        @SuppressWarnings("unchecked")
@@ -77,13 +85,31 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot sna
                this(
                        
StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
                        snapshot.getName(),
-                       (TypeSerializer<N>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
-                       (TypeSerializer<S>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
 null);
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<N>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<S>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
+                       null);
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == 
snapshot.getBackendStateType());
        }
 
+       private RegisteredKeyValueStateBackendMetaInfo(
+               @Nonnull StateDescriptor.Type stateType,
+               @Nonnull String name,
+               @Nonnull StateSerializerProvider<N> namespaceSerializerProvider,
+               @Nonnull StateSerializerProvider<S> stateSerializerProvider,
+               @Nullable StateSnapshotTransformer<S> snapshotTransformer) {
+
+               super(name);
+               this.stateType = stateType;
+               this.namespaceSerializerProvider = namespaceSerializerProvider;
+               this.stateSerializerProvider = stateSerializerProvider;
+               this.snapshotTransformer = snapshotTransformer;
+       }
+
        @Nonnull
        public StateDescriptor.Type getStateType() {
                return stateType;
@@ -91,12 +117,32 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot sna
 
        @Nonnull
        public TypeSerializer<N> getNamespaceSerializer() {
-               return namespaceSerializer;
+               return namespaceSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<N> 
updateNamespaceSerializer(TypeSerializer<N> newNamespaceSerializer) {
+               return 
namespaceSerializerProvider.registerNewSerializerForRestoredState(newNamespaceSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<N> getPreviousNamespaceSerializer() {
+               return namespaceSerializerProvider.previousSchemaSerializer();
        }
 
        @Nonnull
        public TypeSerializer<S> getStateSerializer() {
-               return stateSerializer;
+               return stateSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<S> 
updateStateSerializer(TypeSerializer<S> newStateSerializer) {
+               return 
stateSerializerProvider.registerNewSerializerForRestoredState(newStateSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<S> getPreviousStateSerializer() {
+               return stateSerializerProvider.previousSchemaSerializer();
        }
 
        @Nullable
@@ -104,6 +150,10 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot sna
                return snapshotTransformer;
        }
 
+       public void updateSnapshotTransformer(StateSnapshotTransformer<S> 
snapshotTransformer) {
+               this.snapshotTransformer = snapshotTransformer;
+       }
+
        @Override
        public boolean equals(Object o) {
                if (this == o) {
@@ -133,8 +183,8 @@ public String toString() {
                return "RegisteredKeyedBackendStateMetaInfo{" +
                                "stateType=" + stateType +
                                ", name='" + name + '\'' +
-                               ", namespaceSerializer=" + namespaceSerializer +
-                               ", stateSerializer=" + stateSerializer +
+                               ", namespaceSerializer=" + 
getNamespaceSerializer() +
+                               ", stateSerializer=" + getStateSerializer() +
                                '}';
        }
 
@@ -153,34 +203,19 @@ public StateMetaInfoSnapshot snapshot() {
                return computeSnapshot();
        }
 
-       public static void checkStateMetaInfo(StateMetaInfoSnapshot 
stateMetaInfoSnapshot, StateDescriptor<?, ?> stateDesc) {
-               Preconditions.checkState(
-                       stateMetaInfoSnapshot != null,
-                       "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                               " but its corresponding restored snapshot 
cannot be found.");
-
-               
Preconditions.checkState(stateMetaInfoSnapshot.getBackendStateType()
-                               == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
-                       "Incompatible state types. " +
-                               "Was [" + 
stateMetaInfoSnapshot.getBackendStateType() + "], " +
-                               "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+       public void checkStateMetaInfo(StateDescriptor<?, ?> stateDesc) {
 
                Preconditions.checkState(
-                       Objects.equals(stateDesc.getName(), 
stateMetaInfoSnapshot.getName()),
+                       Objects.equals(stateDesc.getName(), getName()),
                        "Incompatible state names. " +
-                               "Was [" + stateMetaInfoSnapshot.getName() + "], 
" +
+                               "Was [" + getName() + "], " +
                                "registered with [" + stateDesc.getName() + 
"].");
 
-               final StateDescriptor.Type restoredType =
-                       StateDescriptor.Type.valueOf(
-                               stateMetaInfoSnapshot.getOption(
-                                       
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
-
-               if (stateDesc.getType() != StateDescriptor.Type.UNKNOWN && 
restoredType != StateDescriptor.Type.UNKNOWN) {
+               if (stateDesc.getType() != StateDescriptor.Type.UNKNOWN && 
getStateType() != StateDescriptor.Type.UNKNOWN) {
                        Preconditions.checkState(
-                               stateDesc.getType() == restoredType,
+                               stateDesc.getType() == getStateType(),
                                "Incompatible key/value state types. " +
-                                       "Was [" + restoredType + "], " +
+                                       "Was [" + getStateType() + "], " +
                                        "registered with [" + 
stateDesc.getType() + "].");
                }
        }
@@ -194,8 +229,12 @@ private StateMetaInfoSnapshot computeSnapshot() {
                Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshotsMap = new HashMap<>(2);
                String namespaceSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
                String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+
+               TypeSerializer<N> namespaceSerializer = 
getNamespaceSerializer();
                serializerMap.put(namespaceSerializerKey, 
namespaceSerializer.duplicate());
                serializerConfigSnapshotsMap.put(namespaceSerializerKey, 
namespaceSerializer.snapshotConfiguration());
+
+               TypeSerializer<S> stateSerializer = getStateSerializer();
                serializerMap.put(valueSerializerKey, 
stateSerializer.duplicate());
                serializerConfigSnapshotsMap.put(valueSerializerKey, 
stateSerializer.snapshotConfiguration());
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
index 10ba0296057..921947a4dd0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.Map;
@@ -46,21 +48,22 @@
         * The type serializer for the elements in the state list
         */
        @Nonnull
-       private final TypeSerializer<S> partitionStateSerializer;
+       private final StateSerializerProvider<S> 
partitionStateSerializerProvider;
 
        public RegisteredOperatorStateBackendMetaInfo(
                        @Nonnull String name,
                        @Nonnull TypeSerializer<S> partitionStateSerializer,
                        @Nonnull OperatorStateHandle.Mode assignmentMode) {
-               super(name);
-               this.partitionStateSerializer = partitionStateSerializer;
-               this.assignmentMode = assignmentMode;
+               this(
+                       name,
+                       
StateSerializerProvider.fromNewState(partitionStateSerializer),
+                       assignmentMode);
        }
 
        private RegisteredOperatorStateBackendMetaInfo(@Nonnull 
RegisteredOperatorStateBackendMetaInfo<S> copy) {
                this(
                        Preconditions.checkNotNull(copy).name,
-                       copy.partitionStateSerializer.duplicate(),
+                       copy.getPartitionStateSerializer().duplicate(),
                        copy.assignmentMode);
        }
 
@@ -68,13 +71,24 @@ private RegisteredOperatorStateBackendMetaInfo(@Nonnull 
RegisteredOperatorStateB
        public RegisteredOperatorStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
                this(
                        snapshot.getName(),
-                       (TypeSerializer<S>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<S>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
                        OperatorStateHandle.Mode.valueOf(
                                
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == 
snapshot.getBackendStateType());
        }
 
+       private RegisteredOperatorStateBackendMetaInfo(
+                       @Nonnull String name,
+                       @Nonnull StateSerializerProvider<S> 
partitionStateSerializerProvider,
+                       @Nonnull OperatorStateHandle.Mode assignmentMode) {
+               super(name);
+               this.partitionStateSerializerProvider = 
partitionStateSerializerProvider;
+               this.assignmentMode = assignmentMode;
+       }
+
        /**
         * Creates a deep copy of the itself.
         */
@@ -96,7 +110,17 @@ public StateMetaInfoSnapshot snapshot() {
 
        @Nonnull
        public TypeSerializer<S> getPartitionStateSerializer() {
-               return partitionStateSerializer;
+               return 
partitionStateSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<S> 
updatePartitionStateSerializer(TypeSerializer<S> newPartitionStateSerializer) {
+               return 
partitionStateSerializerProvider.registerNewSerializerForRestoredState(newPartitionStateSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<S> getPreviousPartitionStateSerializer() {
+               return 
partitionStateSerializerProvider.previousSchemaSerializer();
        }
 
        @Override
@@ -112,7 +136,7 @@ public boolean equals(Object obj) {
                return (obj instanceof RegisteredOperatorStateBackendMetaInfo)
                        && 
name.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getName())
                        && 
assignmentMode.equals(((RegisteredOperatorStateBackendMetaInfo) 
obj).getAssignmentMode())
-                       && 
partitionStateSerializer.equals(((RegisteredOperatorStateBackendMetaInfo) 
obj).getPartitionStateSerializer());
+                       && 
getPartitionStateSerializer().equals(((RegisteredOperatorStateBackendMetaInfo) 
obj).getPartitionStateSerializer());
        }
 
        @Override
@@ -128,7 +152,7 @@ public String toString() {
                return "RegisteredOperatorBackendStateMetaInfo{" +
                        "name='" + name + "\'" +
                        ", assignmentMode=" + assignmentMode +
-                       ", partitionStateSerializer=" + 
partitionStateSerializer +
+                       ", partitionStateSerializer=" + 
getPartitionStateSerializer() +
                        '}';
        }
 
@@ -138,6 +162,8 @@ private StateMetaInfoSnapshot computeSnapshot() {
                        
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
                        assignmentMode.toString());
                String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+
+               TypeSerializer<S> partitionStateSerializer = 
getPartitionStateSerializer();
                Map<String, TypeSerializer<?>> serializerMap =
                        Collections.singletonMap(valueSerializerKey, 
partitionStateSerializer.duplicate());
                Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshotsMap =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
index 0304b929c6d..961d96fa405 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.Map;
@@ -34,24 +36,34 @@
 public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends 
RegisteredStateMetaInfoBase {
 
        @Nonnull
-       private final TypeSerializer<T> elementSerializer;
+       private final StateSerializerProvider<T> elementSerializerProvider;
 
        public RegisteredPriorityQueueStateBackendMetaInfo(
                @Nonnull String name,
                @Nonnull TypeSerializer<T> elementSerializer) {
 
-               super(name);
-               this.elementSerializer = elementSerializer;
+               this(name, 
StateSerializerProvider.fromNewState(elementSerializer));
        }
 
        @SuppressWarnings("unchecked")
        public 
RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
-               this(snapshot.getName(),
-                       (TypeSerializer<T>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+               this(
+                       snapshot.getName(),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<T>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE 
== snapshot.getBackendStateType());
        }
 
+       private RegisteredPriorityQueueStateBackendMetaInfo(
+               @Nonnull String name,
+               @Nonnull StateSerializerProvider<T> elementSerializerProvider) {
+
+               super(name);
+               this.elementSerializerProvider = elementSerializerProvider;
+       }
+
        @Nonnull
        @Override
        public StateMetaInfoSnapshot snapshot() {
@@ -60,10 +72,21 @@ public StateMetaInfoSnapshot snapshot() {
 
        @Nonnull
        public TypeSerializer<T> getElementSerializer() {
-               return elementSerializer;
+               return elementSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<T> 
updateElementSerializer(TypeSerializer<T> newElementSerializer) {
+               return 
elementSerializerProvider.registerNewSerializerForRestoredState(newElementSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<T> getPreviousElementSerializer() {
+               return elementSerializerProvider.previousSchemaSerializer();
        }
 
        private StateMetaInfoSnapshot computeSnapshot() {
+               TypeSerializer<T> elementSerializer = getElementSerializer();
                Map<String, TypeSerializer<?>> serializerMap =
                        Collections.singletonMap(
                                
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
@@ -82,6 +105,6 @@ private StateMetaInfoSnapshot computeSnapshot() {
        }
 
        public RegisteredPriorityQueueStateBackendMetaInfo deepCopy() {
-               return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, 
elementSerializer.duplicate());
+               return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, 
getElementSerializer().duplicate());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index 4132d144a4a..b7dff59aef0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,4 +42,21 @@ public String getName() {
 
        @Nonnull
        public abstract StateMetaInfoSnapshot snapshot();
+
+       public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull 
StateMetaInfoSnapshot snapshot) {
+
+               final StateMetaInfoSnapshot.BackendStateType backendStateType = 
snapshot.getBackendStateType();
+               switch (backendStateType) {
+                       case KEY_VALUE:
+                               return new 
RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+                       case OPERATOR:
+                               return new 
RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+                       case BROADCAST:
+                               return new 
RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+                       case PRIORITY_QUEUE:
+                               return new 
RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
+                       default:
+                               throw new IllegalArgumentException("Unknown 
backend state type: " + backendStateType);
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
new file mode 100644
index 00000000000..a24f12e42fb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StateSerializerProvider} wraps logic on how to obtain serializers 
for registered state,
+ * either with the previous schema of state in checkpoints or the current 
schema of state.
+ *
+ * @param <T> the type of the state.
+ */
+@Internal
+public abstract class StateSerializerProvider<T> {
+
+       /**
+        * The registered serializer for the state.
+        *
+        * <p>In the case that this provider was created from a restored 
serializer snapshot via
+        * {@link #fromRestoredState(TypeSerializerSnapshot)}, but a new 
serializer was never registered
+        * for the state (i.e., this is the case if a restored state was never 
accessed), this would be {@code null}.
+        */
+       @Nullable
+       TypeSerializer<T> registeredSerializer;
+
+       /**
+        * Creates a {@link StateSerializerProvider} for restored state from 
the previous serializer's snapshot.
+        *
+        * <p>Once a new serializer is registered for the state, it should be 
provided via
+        * the {@link #registerNewSerializerForRestoredState(TypeSerializer)} 
method.
+        *
+        * @param stateSerializerSnapshot the previous serializer's snapshot.
+        * @param <T> the type of the state.
+        *
+        * @return a new {@link StateSerializerProvider} for restored state.
+        */
+       public static <T> StateSerializerProvider<T> 
fromRestoredState(TypeSerializerSnapshot<T> stateSerializerSnapshot) {
+               return new 
RestoredStateSerializerProvider<>(stateSerializerSnapshot);
+       }
+
+       /**
+        * Creates a {@link StateSerializerProvider} for new state from the 
registered state serializer.
+        *
+        * @param registeredStateSerializer the new state's registered 
serializer.
+        * @param <T> the type of the state.
+        *
+        * @return a new {@link StateSerializerProvider} for new state.
+        */
+       public static <T> StateSerializerProvider<T> 
fromNewState(TypeSerializer<T> registeredStateSerializer) {
+               return new 
NewStateSerializerProvider<>(registeredStateSerializer);
+       }
+
+       private StateSerializerProvider(@Nullable TypeSerializer<T> 
stateSerializer) {
+               this.registeredSerializer = stateSerializer;
+       }
+
+       /**
+        * Gets the serializer that recognizes the current serialization schema 
of the state.
+        * This is the serializer that should be used for regular state 
serialization and
+        * deserialization after state has been restored.
+        *
+        * <p>If this provider was created from a restored state's serializer 
snapshot, while a
+        * new serializer (with a new schema) was not registered for the state 
(i.e., because
+        * the state was never accessed after it was restored), then the schema 
of state remains
+        * identical. Therefore, in this case, it is guaranteed that the 
serializer returned by
+        * this method is the same as the one returned by {@link 
#previousSchemaSerializer()}.
+        *
+        * <p>If this provider was created from new state, then this always 
returns the
+        * serializer that the new state was registered with.
+        *
+        * @return a serializer that reads and writes in the current schema of 
the state.
+        */
+       @Nonnull
+       public abstract TypeSerializer<T> currentSchemaSerializer();
+
+       /**
+        * Gets the serializer that recognizes the previous serialization 
schema of the state.
+        * This is the serializer that should be used for restoring the state, 
i.e. when the state
+        * is still in the previous serialization schema.
+        *
+        * <p>This method can only be used if this provider was created from a 
restored state's serializer
+        * snapshot. If this provider was created from new state, then this 
method is
+        * irrelevant, since there doesn't exist any previous version of the 
state schema.
+        *
+        * @return a serializer that reads and writes in the previous schema of 
the state.
+        */
+       @Nonnull
+       public abstract TypeSerializer<T> previousSchemaSerializer();
+
+       /**
+        * For restored state, register a new serializer that potentially has a 
new serialization schema.
+        *
+        * <p>Users are allowed to register serializers for state only once. 
Therefore, this method
+        * is irrelevant if this provider was created from new state, since a 
state serializer had
+        * been registered already.
+        *
+        * <p>For the case where this provider was created from restored state, 
then this method should
+        * be called at most once. The new serializer will be checked for its 
schema compatibility with the
+        * previous serializer's schema, and returned to the caller. The caller 
is responsible for
+        * checking the result and react appropriately to it, as follows:
+        * <ul>
+        *     <li>{@link 
TypeSerializerSchemaCompatibility#isCompatibleAsIs()}: nothing needs to be done.
+        *     {@link #currentSchemaSerializer()} now returns the newly 
registered serializer.</li>
+        *     <li>{@link 
TypeSerializerSchemaCompatibility#isCompatibleAfterMigration()} ()}: state 
needs to be
+        *     migrated before the serializer returned by {@link 
#currentSchemaSerializer()} can be used.
+        *     The migration should be performed by reading the state with 
{@link #previousSchemaSerializer()},
+        *     and then writing it again with {@link 
#currentSchemaSerializer()}.</li>
+        *     <li>{@link TypeSerializerSchemaCompatibility#isIncompatible()}: 
the registered serializer is
+        *     incompatible. {@link #currentSchemaSerializer()} can no longer 
return a serializer for
+        *     the state, and therefore this provider shouldn't be used 
anymore.</li>
+        * </ul>
+        *
+        * @return the schema compatibility of the new registered serializer, 
with respect to the previous serializer.
+        */
+       @Nonnull
+       public abstract TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer);
+
+       /**
+        * Implementation of the {@link StateSerializerProvider} for the 
restored state case.
+        */
+       private static class RestoredStateSerializerProvider<T> extends 
StateSerializerProvider<T> {
+
+               /**
+                * The snapshot of the previous serializer of the state.
+                */
+               @Nonnull
+               private final TypeSerializerSnapshot<T> 
previousSerializerSnapshot;
+
+               private boolean isRegisteredWithIncompatibleSerializer = false;
+
+               RestoredStateSerializerProvider(TypeSerializerSnapshot<T> 
previousSerializerSnapshot) {
+                       super(null);
+                       this.previousSerializerSnapshot = 
Preconditions.checkNotNull(previousSerializerSnapshot);
+               }
+
+               /**
+                * The restore serializer, lazily created only when the restore 
serializer is accessed.
+                *
+                * <p>NOTE: It is important to only create this lazily, so that 
off-heap
+                * state do not fail eagerly when restoring state that has a
+                * {@link UnloadableDummyTypeSerializer} as the previous 
serializer. This should
+                * be relevant only for restores from Flink versions prior to 
1.7.x.
+                */
+               @Nullable
+               private TypeSerializer<T> cachedRestoredSerializer;
+
+               @Override
+               @Nonnull
+               public TypeSerializer<T> currentSchemaSerializer() {
+                       if (registeredSerializer != null) {
+                               checkState(
+                                       !isRegisteredWithIncompatibleSerializer,
+                                       "Unable to provide a serializer with 
the current schema, because the restored state was " +
+                                               "registered with a new 
serializer that has incompatible schema.");
+
+                                       return registeredSerializer;
+                       }
+
+                       // if we are not yet registered with a new serializer,
+                       // we can just use the restore serializer to read / 
write the state.
+                       return previousSchemaSerializer();
+               }
+
+               @Nonnull
+               public TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
+                       checkNotNull(newSerializer);
+                       if (registeredSerializer != null) {
+                               throw new UnsupportedOperationException("A 
serializer has already been registered for the state; re-registration is not 
allowed.");
+                       }
+
+                       TypeSerializerSchemaCompatibility<T> result = 
previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
+                       if (result.isIncompatible()) {
+                               this.isRegisteredWithIncompatibleSerializer = 
true;
+                       }
+                       this.registeredSerializer = newSerializer;
+                       return result;
+               }
+
+               @Nonnull
+               public final TypeSerializer<T> previousSchemaSerializer() {
+                       if (cachedRestoredSerializer != null) {
+                               return cachedRestoredSerializer;
+                       }
+
+                       this.cachedRestoredSerializer = 
previousSerializerSnapshot.restoreSerializer();
+                       return cachedRestoredSerializer;
+               }
+       }
+
+       /**
+        * Implementation of the {@link StateSerializerProvider} for the new 
state case.
+        */
+       private static class NewStateSerializerProvider<T> extends 
StateSerializerProvider<T> {
+
+               NewStateSerializerProvider(TypeSerializer<T> 
registeredStateSerializer) {
+                       
super(Preconditions.checkNotNull(registeredStateSerializer));
+               }
+
+               @Override
+               @Nonnull
+               @SuppressWarnings("ConstantConditions")
+               public TypeSerializer<T> currentSchemaSerializer() {
+                       return registeredSerializer;
+               }
+
+               @Override
+               @Nonnull
+               public TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
+                       throw new UnsupportedOperationException("A serializer 
has already been registered for the state; re-registration is not allowed.");
+               }
+
+               @Override
+               @Nonnull
+               public TypeSerializer<T> previousSchemaSerializer() {
+                       throw new UnsupportedOperationException("This is a 
NewStateSerializerProvider; you cannot get a restore serializer because there 
was no restored state.");
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 4eff3a285bb..3f8761b657a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -30,7 +30,6 @@
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -79,6 +78,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -125,14 +125,6 @@
         */
        private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> 
registeredPQStates;
 
-       /**
-        * Map of state names to their corresponding restored state meta info.
-        *
-        * <p>TODO this map can be removed when eager-state registration is in 
place.
-        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
-        */
-       private final Map<StateUID, StateMetaInfoSnapshot> 
restoredStateMetaInfo;
-
        /**
         * The configuration for local recovery.
         */
@@ -173,7 +165,6 @@ public HeapKeyedStateBackend(
 
                this.snapshotStrategy = new 
HeapSnapshotStrategy(synchronicityTrait);
                LOG.info("Initializing heap keyed state backend with stream 
factory.");
-               this.restoredStateMetaInfo = new HashMap<>();
                this.priorityQueueSetFactory = priorityQueueSetFactory;
        }
 
@@ -194,23 +185,9 @@ public HeapKeyedStateBackend(
                        // TODO we implement the simple way of supporting the 
current functionality, mimicking keyed state
                        // because this should be reworked in FLINK-9376 and 
then we should have a common algorithm over
                        // StateMetaInfoSnapshot that avoids this code 
duplication.
-                       StateMetaInfoSnapshot restoredMetaInfoSnapshot =
-                               
restoredStateMetaInfo.get(StateUID.of(stateName, 
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE));
-
-                       Preconditions.checkState(
-                               restoredMetaInfoSnapshot != null,
-                               "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                                       " but its corresponding restored 
snapshot cannot be found.");
-
-                       StateMetaInfoSnapshot.CommonSerializerKeys 
serializerKey =
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
-
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<T> serializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<T>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
 
                        TypeSerializerSchemaCompatibility<T> 
compatibilityResult =
-                               
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
+                               
existingState.getMetaInfo().updateElementSerializer(byteOrderedElementSerializer);
 
                        if (compatibilityResult.isIncompatible()) {
                                throw new FlinkRuntimeException(new 
StateMigrationException("For heap backends, the new priority queue serializer 
must not be incompatible."));
@@ -252,57 +229,42 @@ public HeapKeyedStateBackend(
        private <N, V> StateTable<K, N, V> tryRegisterStateTable(
                        TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<?, V> stateDesc,
-                       StateSnapshotTransformer<V> snapshotTransformer) throws 
StateMigrationException {
+                       @Nullable StateSnapshotTransformer<V> 
snapshotTransformer) throws StateMigrationException {
 
                @SuppressWarnings("unchecked")
                StateTable<K, N, V> stateTable = (StateTable<K, N, V>) 
registeredKVStates.get(stateDesc.getName());
 
                TypeSerializer<V> newStateSerializer = 
stateDesc.getSerializer();
-               RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
-                       stateDesc.getType(),
-                       stateDesc.getName(),
-                       namespaceSerializer,
-                       newStateSerializer,
-                       snapshotTransformer);
 
                if (stateTable != null) {
-                       @SuppressWarnings("unchecked")
-                       StateMetaInfoSnapshot restoredMetaInfoSnapshot =
-                               restoredStateMetaInfo.get(
-                                       StateUID.of(stateDesc.getName(), 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE));
-
-                       Preconditions.checkState(
-                               restoredMetaInfoSnapshot != null,
-                               "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                                       " but its corresponding restored 
snapshot cannot be found.");
+                       RegisteredKeyValueStateBackendMetaInfo<N, V> 
restoredKvMetaInfo = stateTable.getMetaInfo();
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<N> namespaceSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<N>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                                       
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()));
+                       
restoredKvMetaInfo.updateSnapshotTransformer(snapshotTransformer);
 
                        TypeSerializerSchemaCompatibility<N> 
namespaceCompatibility =
-                               
namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer);
-                       if (namespaceCompatibility.isIncompatible()) {
-                               throw new StateMigrationException("For heap 
backends, the new namespace serializer must not be incompatible.");
+                               
restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
+                       if (!namespaceCompatibility.isCompatibleAsIs()) {
+                               throw new StateMigrationException("For heap 
backends, the new namespace serializer must be compatible.");
                        }
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<V> stateSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<V>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                                       
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()));
-
-                       
RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot,
 stateDesc);
+                       restoredKvMetaInfo.checkStateMetaInfo(stateDesc);
 
                        TypeSerializerSchemaCompatibility<V> stateCompatibility 
=
-                               
stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
+                               
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
 
                        if (stateCompatibility.isIncompatible()) {
                                throw new StateMigrationException("For heap 
backends, the new state serializer must not be incompatible.");
                        }
 
-                       stateTable.setMetaInfo(newMetaInfo);
+                       stateTable.setMetaInfo(restoredKvMetaInfo);
                } else {
+                       RegisteredKeyValueStateBackendMetaInfo<N, V> 
newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
+                               stateDesc.getType(),
+                               stateDesc.getName(),
+                               namespaceSerializer,
+                               newStateSerializer,
+                               snapshotTransformer);
+
                        stateTable = 
snapshotStrategy.newStateTable(newMetaInfo);
                        registeredKVStates.put(stateDesc.getName(), stateTable);
                }
@@ -536,10 +498,6 @@ private void createOrCheckStateForMetaInfo(
                Map<Integer, StateMetaInfoSnapshot> kvStatesById) {
 
                for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) 
{
-                       restoredStateMetaInfo.put(
-                               StateUID.of(metaInfoSnapshot.getName(), 
metaInfoSnapshot.getBackendStateType()),
-                               metaInfoSnapshot);
-
                        final StateSnapshotRestore registeredState;
 
                        switch (metaInfoSnapshot.getBackendStateType()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
index 1e9d9191079..9b05500e4d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
@@ -81,7 +81,7 @@
 
        /** The configurations of all the type serializers used with the state. 
*/
        @Nonnull
-       private final Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshots;
+       private final Map<String, TypeSerializerSnapshot<?>> 
serializerSnapshots;
 
        // TODO this will go away once all serializers have the 
restoreSerializer() factory method properly implemented.
        /** The serializers used by the state. */
@@ -92,8 +92,8 @@ public StateMetaInfoSnapshot(
                @Nonnull String name,
                @Nonnull BackendStateType backendStateType,
                @Nonnull Map<String, String> options,
-               @Nonnull Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshots) {
-               this(name, backendStateType, options, 
serializerConfigSnapshots, new HashMap<>());
+               @Nonnull Map<String, TypeSerializerSnapshot<?>> 
serializerSnapshots) {
+               this(name, backendStateType, options, serializerSnapshots, new 
HashMap<>());
        }
 
        /**
@@ -106,12 +106,12 @@ public StateMetaInfoSnapshot(
                @Nonnull String name,
                @Nonnull BackendStateType backendStateType,
                @Nonnull Map<String, String> options,
-               @Nonnull Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshots,
+               @Nonnull Map<String, TypeSerializerSnapshot<?>> 
serializerSnapshots,
                @Nonnull Map<String, TypeSerializer<?>> serializers) {
                this.name = name;
                this.backendStateType = backendStateType;
                this.options = options;
-               this.serializerConfigSnapshots = serializerConfigSnapshots;
+               this.serializerSnapshots = serializerSnapshots;
                this.serializers = serializers;
        }
 
@@ -121,13 +121,13 @@ public BackendStateType getBackendStateType() {
        }
 
        @Nullable
-       public TypeSerializerSnapshot<?> 
getTypeSerializerConfigSnapshot(@Nonnull String key) {
-               return serializerConfigSnapshots.get(key);
+       public TypeSerializerSnapshot<?> getTypeSerializerSnapshot(@Nonnull 
String key) {
+               return serializerSnapshots.get(key);
        }
 
        @Nullable
-       public TypeSerializerSnapshot<?> 
getTypeSerializerConfigSnapshot(@Nonnull CommonSerializerKeys key) {
-               return getTypeSerializerConfigSnapshot(key.toString());
+       public TypeSerializerSnapshot<?> getTypeSerializerSnapshot(@Nonnull 
CommonSerializerKeys key) {
+               return getTypeSerializerSnapshot(key.toString());
        }
 
        @Nullable
@@ -150,20 +150,9 @@ public String getName() {
                return name;
        }
 
-       @Nullable
-       public TypeSerializer<?> restoreTypeSerializer(@Nonnull String key) {
-               TypeSerializerSnapshot<?> configSnapshot = 
getTypeSerializerConfigSnapshot(key);
-               return (configSnapshot != null) ? 
configSnapshot.restoreSerializer() : null;
-       }
-
-       @Nullable
-       public TypeSerializer<?> restoreTypeSerializer(@Nonnull 
CommonSerializerKeys key) {
-               return restoreTypeSerializer(key.toString());
-       }
-
        @Nonnull
-       public Map<String, TypeSerializerSnapshot<?>> 
getSerializerConfigSnapshotsImmutable() {
-               return Collections.unmodifiableMap(serializerConfigSnapshots);
+       public Map<String, TypeSerializerSnapshot<?>> 
getSerializerSnapshotsImmutable() {
+               return Collections.unmodifiableMap(serializerSnapshots);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
index 4408dfcacef..ad1e7be2871 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
@@ -165,7 +165,7 @@ public void writeStateMetaInfoSnapshot(
                        @Nonnull DataOutputView outputView) throws IOException {
                        final Map<String, String> optionsMap = 
snapshot.getOptionsImmutable();
                        final Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshotsMap =
-                               
snapshot.getSerializerConfigSnapshotsImmutable();
+                               snapshot.getSerializerSnapshotsImmutable();
 
                        outputView.writeUTF(snapshot.getName());
                        
outputView.writeInt(snapshot.getBackendStateType().ordinal());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index c1f08e06b3c..55aacb23057 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -251,7 +251,7 @@ private void 
assertEqualStateMetaInfoSnapshots(StateMetaInfoSnapshot expected, S
                Assert.assertEquals(expected.getBackendStateType(), 
actual.getBackendStateType());
                Assert.assertEquals(expected.getOptionsImmutable(), 
actual.getOptionsImmutable());
                Assert.assertEquals(
-                       expected.getSerializerConfigSnapshotsImmutable(),
-                       actual.getSerializerConfigSnapshotsImmutable());
+                       expected.getSerializerSnapshotsImmutable(),
+                       actual.getSerializerSnapshotsImmutable());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index f5f30d5037a..5511792673e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -34,21 +34,20 @@
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.RunnableFuture;
 
 /**
@@ -69,17 +68,6 @@
        // lazily initialized stream storage
        private CheckpointStorageLocation checkpointStorageLocation;
 
-       /**
-        * The compatibility behaviour of {@link TestSerializer}.
-        * This controls what format the serializer writes in, as well as
-        * the result of the compatibility check against the prior serializer 
snapshot.
-        */
-       public enum SerializerCompatibilityType {
-               COMPATIBLE_AS_IS,
-               REQUIRES_MIGRATION,
-               INCOMPATIBLE
-       }
-
        // 
-------------------------------------------------------------------------------
        //  Keyed state backend migration tests
        // 
-------------------------------------------------------------------------------
@@ -95,7 +83,7 @@ public void testKeyedValueStateMigration() throws Exception {
                try {
                        ValueStateDescriptor<TestType> kvId = new 
ValueStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ValueState<TestType> valueState = backend
                                .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -113,10 +101,10 @@ public void testKeyedValueStateMigration() throws 
Exception {
 
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
 
-                       // the new serializer is REQUIRES_MIGRATION, and has a 
completely new serialization schema.
+                       // the new serializer is V2, and has a completely new 
serialization schema.
                        kvId = new ValueStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+                               new TestType.V2TestTypeSerializer());
                        valueState = backend
                                .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -151,7 +139,7 @@ public void testKeyedListStateMigration() throws Exception {
                try {
                        ListStateDescriptor<TestType> kvId = new 
ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ListState<TestType> listState = backend
                                .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -174,10 +162,10 @@ public void testKeyedListStateMigration() throws 
Exception {
 
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
 
-                       // the new serializer is REQUIRES_MIGRATION, and has a 
completely new serialization schema.
+                       // the new serializer is V2, and has a completely new 
serialization schema.
                        kvId = new ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+                               new TestType.V2TestTypeSerializer());
                        listState = backend
                                .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -221,7 +209,7 @@ public void 
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatib
                try {
                        ValueStateDescriptor<TestType> kvId = new 
ValueStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ValueState<TestType> valueState = backend
                                .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -241,10 +229,12 @@ public void 
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatib
 
                        kvId = new ValueStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
+                               new TestType.IncompatibleTestTypeSerializer());
 
                        // the new serializer is INCOMPATIBLE, so registering 
the state should fail
                        backend.getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+                       Assert.fail("should have failed");
                } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                }finally {
@@ -263,7 +253,7 @@ public void 
testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatibl
                try {
                        ListStateDescriptor<TestType> kvId = new 
ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ListState<TestType> listState = backend
                                .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -288,10 +278,12 @@ public void 
testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatibl
 
                        kvId = new ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+                               new TestType.IncompatibleTestTypeSerializer());
 
                        // the new serializer is INCOMPATIBLE, so registering 
the state should fail
                        backend.getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+                       Assert.fail("should have failed");
                } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                } finally {
@@ -308,7 +300,7 @@ public void 
testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible()
 
                try {
                        InternalPriorityQueue<TestType> internalPriorityQueue = 
backend.create(
-                               "testPriorityQueue", new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               "testPriorityQueue", new 
TestType.V1TestTypeSerializer());
 
                        internalPriorityQueue.add(new TestType("key-1", 123));
                        internalPriorityQueue.add(new TestType("key-2", 346));
@@ -321,7 +313,7 @@ public void 
testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible()
 
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
                        backend.create(
-                               "testPriorityQueue", new 
TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
+                               "testPriorityQueue", new 
TestType.IncompatibleTestTypeSerializer());
 
                        Assert.fail("should have failed");
                } catch (Exception e) {
@@ -337,7 +329,7 @@ public void 
testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() thr
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
 
                AbstractKeyedStateBackend<TestType> backend = 
createKeyedBackend(
-                       new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                       new TestType.V1TestTypeSerializer());
 
                final String stateName = "test-name";
                try {
@@ -357,14 +349,18 @@ public void 
testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() thr
 
                        try {
                                // the new key serializer is incompatible; this 
should fail the restore
-                               restoreKeyedBackend(new 
TestSerializer(SerializerCompatibilityType.INCOMPATIBLE), snapshot);
+                               restoreKeyedBackend(new 
TestType.IncompatibleTestTypeSerializer(), snapshot);
+
+                               Assert.fail("should have failed");
                        } catch (Exception e) {
                                
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                        }
 
                        try {
                                // the new key serializer requires migration; 
this should fail the restore
-                               restoreKeyedBackend(new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION), snapshot);
+                               restoreKeyedBackend(new 
TestType.V2TestTypeSerializer(), snapshot);
+
+                               Assert.fail("should have failed");
                        } catch (Exception e) {
                                
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                        }
@@ -386,7 +382,7 @@ public void 
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatib
                        ValueState<Integer> valueState = backend
                                .getPartitionedState(
                                        new TestType("namespace", 123),
-                                       new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS),
+                                       new TestType.V1TestTypeSerializer(),
                                        kvId);
 
                        backend.setCurrentKey(1);
@@ -397,26 +393,33 @@ public void 
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatib
                        KeyedStateHandle snapshot = runSnapshot(
                                backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
                                sharedStateRegistry);
-                       backend.dispose();
 
+                       // test incompatible namespace serializer; start with a 
freshly restored backend
+                       backend.dispose();
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
-
                        try {
                                // the new namespace serializer is 
incompatible; this should fail the restore
                                backend.getPartitionedState(
                                        new TestType("namespace", 123),
-                                       new 
TestSerializer(SerializerCompatibilityType.INCOMPATIBLE),
+                                       new 
TestType.IncompatibleTestTypeSerializer(),
                                        kvId);
+
+                               Assert.fail("should have failed");
                        } catch (Exception e) {
                                
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                        }
 
+                       // test namespace serializer that requires migration; 
start with a freshly restored backend
+                       backend.dispose();
+                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
                        try {
                                // the new namespace serializer requires 
migration; this should fail the restore
                                backend.getPartitionedState(
                                        new TestType("namespace", 123),
-                                       new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION),
+                                       new TestType.V2TestTypeSerializer(),
                                        kvId);
+
+                               Assert.fail("should have failed");
                        } catch (Exception e) {
                                
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                        }
@@ -439,7 +442,7 @@ public void testOperatorParitionableListStateMigration() 
throws Exception {
                try {
                        ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ListState<TestType> state = 
backend.getListState(descriptor);
 
                        state.add(new TestType("foo", 13));
@@ -453,7 +456,7 @@ public void testOperatorParitionableListStateMigration() 
throws Exception {
 
                        descriptor = new ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+                               new TestType.V2TestTypeSerializer());
                        state = backend.getListState(descriptor);
 
                        // the state backend should have decided whether or not 
it needs to perform state migration;
@@ -478,7 +481,7 @@ public void testUnionListStateMigration() throws Exception {
                try {
                        ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ListState<TestType> state = 
backend.getUnionListState(descriptor);
 
                        state.add(new TestType("foo", 13));
@@ -492,7 +495,7 @@ public void testUnionListStateMigration() throws Exception {
 
                        descriptor = new ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+                               new TestType.V2TestTypeSerializer());
                        state = backend.getUnionListState(descriptor);
 
                        // the state backend should have decided whether or not 
it needs to perform state migration;
@@ -518,7 +521,7 @@ public void testBroadcastStateValueMigration() throws 
Exception {
                        MapStateDescriptor<Integer, TestType> descriptor = new 
MapStateDescriptor<>(
                                stateName,
                                IntSerializer.INSTANCE,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        BroadcastState<Integer, TestType> state = 
backend.getBroadcastState(descriptor);
 
                        state.put(3, new TestType("foo", 13));
@@ -533,7 +536,7 @@ public void testBroadcastStateValueMigration() throws 
Exception {
                        descriptor = new MapStateDescriptor<>(
                                stateName,
                                IntSerializer.INSTANCE,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+                               new TestType.V2TestTypeSerializer());
                        state = backend.getBroadcastState(descriptor);
 
                        // the state backend should have decided whether or not 
it needs to perform state migration;
@@ -556,7 +559,7 @@ public void testBroadcastStateKeyMigration() throws 
Exception {
                try {
                        MapStateDescriptor<TestType, Integer> descriptor = new 
MapStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS),
+                               new TestType.V1TestTypeSerializer(),
                                IntSerializer.INSTANCE);
                        BroadcastState<TestType, Integer> state = 
backend.getBroadcastState(descriptor);
 
@@ -571,7 +574,7 @@ public void testBroadcastStateKeyMigration() throws 
Exception {
 
                        descriptor = new MapStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION),
+                               new TestType.V2TestTypeSerializer(),
                                IntSerializer.INSTANCE);
                        state = backend.getBroadcastState(descriptor);
 
@@ -595,7 +598,7 @@ public void 
testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsI
                try {
                        ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ListState<TestType> state = 
backend.getListState(descriptor);
 
                        state.add(new TestType("foo", 13));
@@ -609,7 +612,7 @@ public void 
testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsI
 
                        descriptor = new ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
+                               new TestType.IncompatibleTestTypeSerializer());
 
                        // the new serializer is INCOMPATIBLE, so registering 
the state should fail
                        backend.getListState(descriptor);
@@ -632,7 +635,7 @@ public void 
testUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() t
                try {
                        ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        ListState<TestType> state = 
backend.getUnionListState(descriptor);
 
                        state.add(new TestType("foo", 13));
@@ -646,7 +649,7 @@ public void 
testUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() t
 
                        descriptor = new ListStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
+                               new TestType.IncompatibleTestTypeSerializer());
 
                        // the new serializer is INCOMPATIBLE, so registering 
the state should fail
                        backend.getUnionListState(descriptor);
@@ -670,7 +673,7 @@ public void 
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatibl
                        MapStateDescriptor<Integer, TestType> descriptor = new 
MapStateDescriptor<>(
                                stateName,
                                IntSerializer.INSTANCE,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS));
+                               new TestType.V1TestTypeSerializer());
                        BroadcastState<Integer, TestType> state = 
backend.getBroadcastState(descriptor);
 
                        state.put(3, new TestType("foo", 13));
@@ -685,10 +688,12 @@ public void 
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatibl
                        descriptor = new MapStateDescriptor<>(
                                stateName,
                                IntSerializer.INSTANCE,
-                               new 
TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+                               new TestType.IncompatibleTestTypeSerializer());
 
                        // the new value serializer is INCOMPATIBLE, so 
registering the state should fail
                        backend.getBroadcastState(descriptor);
+
+                       Assert.fail("should have failed.");
                } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                } finally {
@@ -706,7 +711,7 @@ public void 
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible(
                try {
                        MapStateDescriptor<TestType, Integer> descriptor = new 
MapStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS),
+                               new TestType.V1TestTypeSerializer(),
                                IntSerializer.INSTANCE);
                        BroadcastState<TestType, Integer> state = 
backend.getBroadcastState(descriptor);
 
@@ -721,11 +726,13 @@ public void 
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible(
 
                        descriptor = new MapStateDescriptor<>(
                                stateName,
-                               new 
TestSerializer(SerializerCompatibilityType.INCOMPATIBLE),
+                               new TestType.IncompatibleTestTypeSerializer(),
                                IntSerializer.INSTANCE);
 
                        // the new key serializer is INCOMPATIBLE, so 
registering the state should fail
                        backend.getBroadcastState(descriptor);
+
+                       Assert.fail("should have failed.");
                } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
                } finally {
@@ -737,223 +744,6 @@ public void 
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible(
        //  Test types, serializers, and serializer snapshots
        // 
-------------------------------------------------------------------------------
 
-       /**
-        * The type used as state under tests.
-        *
-        * <p>This is implemented so that the type can also be used as keyed 
priority queue state.
-        */
-       private static class TestType implements HeapPriorityQueueElement, 
PriorityComparable<TestType>, Keyed<String> {
-
-               private int index;
-
-               private final int value;
-               private final String key;
-
-               public TestType(String key, int value) {
-                       this.key = key;
-                       this.value = value;
-               }
-
-               @Override
-               public String getKey() {
-                       return key;
-               }
-
-               @Override
-               public int comparePriorityTo(@Nonnull TestType other) {
-                       return Integer.compare(value, other.value);
-               }
-
-               @Override
-               public int getInternalIndex() {
-                       return index;
-               }
-
-               @Override
-               public void setInternalIndex(int newIndex) {
-                       this.index = newIndex;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj == null || !(obj instanceof 
StateBackendMigrationTestBase.TestType)) {
-                               return false;
-                       }
-
-                       if (obj == this) {
-                               return true;
-                       }
-
-                       TestType other = (TestType) obj;
-                       return Objects.equals(key, other.key) && value == 
other.value;
-               }
-
-               @Override
-               public int hashCode() {
-                       return 31 * key.hashCode() + value;
-               }
-       }
-
-       private static class TestSerializer extends TypeSerializer<TestType> {
-
-               private static final String MIGRATION_PAYLOAD = 
"random-migration-payload";
-
-               private final SerializerCompatibilityType compatibilityType;
-
-               TestSerializer(SerializerCompatibilityType compatibilityType) {
-                       this.compatibilityType = compatibilityType;
-               }
-
-               // 
--------------------------------------------------------------------------------
-               //  State serialization relevant methods
-               // 
--------------------------------------------------------------------------------
-
-               @Override
-               public void serialize(TestType record, DataOutputView target) 
throws IOException {
-                       switch (compatibilityType) {
-                               case COMPATIBLE_AS_IS:
-                                       target.writeUTF(record.getKey());
-                                       target.writeInt(record.value);
-                                       break;
-
-                               case REQUIRES_MIGRATION:
-                                       target.writeUTF(record.getKey());
-                                       target.writeUTF(MIGRATION_PAYLOAD);
-                                       target.writeInt(record.value);
-                                       target.writeBoolean(true);
-                                       break;
-
-                               case INCOMPATIBLE:
-                                       // the serializer shouldn't be used in 
this case
-                                       throw new 
UnsupportedOperationException();
-                       }
-               }
-
-               @Override
-               public TestType deserialize(DataInputView source) throws 
IOException {
-                       String key;
-                       int value;
-
-                       switch (compatibilityType) {
-                               case COMPATIBLE_AS_IS:
-                                       key = source.readUTF();
-                                       value = source.readInt();
-                                       break;
-
-                               case REQUIRES_MIGRATION:
-                                       key = source.readUTF();
-                                       Assert.assertEquals(MIGRATION_PAYLOAD, 
source.readUTF());
-                                       value = source.readInt();
-                                       Assert.assertTrue(source.readBoolean());
-                                       break;
-
-                               default:
-                               case INCOMPATIBLE:
-                                       // the serializer shouldn't be used in 
this case
-                                       throw new 
UnsupportedOperationException();
-                       }
-
-                       return new TestType(key, value);
-               }
-
-               @Override
-               public TestType copy(TestType from) {
-                       return new TestType(from.key, from.value);
-               }
-
-               @Override
-               public TypeSerializerSnapshot<TestType> snapshotConfiguration() 
{
-                       return new TestSerializerSnapshot();
-               }
-
-               // 
--------------------------------------------------------------------------------
-               //  Miscellaneous serializer methods
-               // 
--------------------------------------------------------------------------------
-
-               @Override
-               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
-                       serialize(deserialize(source), target);
-               }
-
-               @Override
-               public TestType deserialize(TestType reuse, DataInputView 
source) throws IOException {
-                       return deserialize(source);
-               }
-
-               @Override
-               public TestType copy(TestType from, TestType reuse) {
-                       return copy(from);
-               }
-
-               @Override
-               public TestType createInstance() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public TypeSerializer<TestType> duplicate() {
-                       return this;
-               }
-
-               @Override
-               public boolean isImmutableType() {
-                       return false;
-               }
-
-               @Override
-               public int getLength() {
-                       return -1;
-               }
-
-               @Override
-               public boolean canEqual(Object obj) {
-                       return getClass().equals(obj.getClass());
-               }
-
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       return obj == this;
-               }
-       }
-
-       public static class TestSerializerSnapshot implements 
TypeSerializerSnapshot<TestType> {
-
-               @Override
-               public int getCurrentVersion() {
-                       return 1;
-               }
-
-               @Override
-               public TypeSerializer<TestType> restoreSerializer() {
-                       return new 
TestSerializer(SerializerCompatibilityType.COMPATIBLE_AS_IS);
-               }
-
-               @Override
-               public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(TypeSerializer<TestType> newSerializer) {
-                       switch (((TestSerializer) 
newSerializer).compatibilityType) {
-                               case COMPATIBLE_AS_IS:
-                                       return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
-                               case REQUIRES_MIGRATION:
-                                       return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
-                               case INCOMPATIBLE:
-                                       return 
TypeSerializerSchemaCompatibility.incompatible();
-                               default:
-                                       throw new 
UnsupportedOperationException();
-                       }
-               }
-
-               @Override
-               public void writeSnapshot(DataOutputView out) throws 
IOException {}
-
-               @Override
-               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {}
-       }
-
        public static class CustomVoidNamespaceSerializer extends 
TypeSerializer<VoidNamespace> {
 
                private static final long serialVersionUID = 1L;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 54cac1ea177..a9bd1f694a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -149,14 +149,6 @@
        @Rule
        public final ExpectedException expectedException = 
ExpectedException.none();
 
-       /**
-        * The serialization timeliness behaviour of the state backend under 
test.
-        */
-       public enum BackendSerializationTimeliness {
-               ON_ACCESS,
-               ON_CHECKPOINTS
-       }
-
        // lazily initialized stream storage
        private CheckpointStorageLocation checkpointStorageLocation;
 
@@ -3145,6 +3137,57 @@ public void testMapStateDefaultValue() throws Exception {
                backend.dispose();
        }
 
+       @Test
+       public void testSnapshotNonAccessedState() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               AbstractKeyedStateBackend<String> backend = 
createKeyedBackend(StringSerializer.INSTANCE);
+
+               final String stateName = "test-name";
+               try {
+                       MapStateDescriptor<Integer, String> kvId = new 
MapStateDescriptor<>(stateName, Integer.class, String.class);
+                       MapState<Integer, String> mapState = backend
+                               .getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+                       // write some state to be snapshotted
+                       backend.setCurrentKey("1");
+                       mapState.put(11, "foo");
+                       backend.setCurrentKey("2");
+                       mapState.put(8, "bar");
+                       backend.setCurrentKey("3");
+                       mapState.put(91, "hello world");
+
+                       // take a snapshot, and then restore backend with 
snapshot
+                       KeyedStateHandle snapshot = runSnapshot(
+                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                               sharedStateRegistry);
+                       backend.dispose();
+
+                       backend = 
restoreKeyedBackend(StringSerializer.INSTANCE, snapshot);
+
+                       // now take a snapshot again without accessing the state
+                       snapshot = runSnapshot(
+                               backend.snapshot(2L, 3L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                               sharedStateRegistry);
+                       backend.dispose();
+
+                       // we restore again and try to access previous state
+                       backend = 
restoreKeyedBackend(StringSerializer.INSTANCE, snapshot);
+                       mapState = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+                       backend.setCurrentKey("1");
+                       assertEquals("foo", mapState.get(11));
+                       backend.setCurrentKey("2");
+                       assertEquals("bar", mapState.get(8));
+                       backend.setCurrentKey("3");
+                       assertEquals("hello world", mapState.get(91));
+
+                       snapshot.discardState();
+               } finally {
+                       backend.dispose();
+               }
+       }
+
        /**
         * This test verifies that state is correctly assigned to key groups 
and that restore
         * restores the relevant key groups in the backend.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
new file mode 100644
index 00000000000..de1f2bd962c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test suit for {@link StateSerializerProvider}.
+ */
+public class StateSerializerProviderTest {
+
+       // 
--------------------------------------------------------------------------------
+       //  Tests for #currentSchemaSerializer()
+       // 
--------------------------------------------------------------------------------
+
+       @Test
+       public void testCurrentSchemaSerializerForNewStateSerializerProvider() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void 
testCurrentSchemaSerializerForRestoredStateSerializerProvider() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       // 
--------------------------------------------------------------------------------
+       //  Tests for #previousSchemaSerializer()
+       // 
--------------------------------------------------------------------------------
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testPreviousSchemaSerializerForNewStateSerializerProvider() 
{
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+
+               // this should fail with an exception
+               testProvider.previousSchemaSerializer();
+       }
+
+       @Test
+       public void 
testPreviousSchemaSerializerForRestoredStateSerializerProvider() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void testLazyInstantiationOfPreviousSchemaSerializer() {
+               // create the provider with an exception throwing snapshot;
+               // this would throw an exception if the restore serializer was 
eagerly accessed
+               StateSerializerProvider<String> testProvider =
+                       StateSerializerProvider.fromRestoredState(new 
ExceptionThrowingSerializerSnapshot());
+
+               try {
+                       // if we fail here, that means the restore serializer 
was indeed lazily accessed
+                       testProvider.previousSchemaSerializer();
+                       fail("expected to fail when accessing the restore 
serializer.");
+               } catch (Exception expected) {
+                       // success
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------
+       //  Tests for #registerNewSerializerForRestoredState(TypeSerializer)
+       // 
--------------------------------------------------------------------------------
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void 
testRegisterNewSerializerWithNewStateSerializerProviderShouldFail() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+               testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void 
testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFail() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+
+               // second registration should fail
+               testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+       }
+
+       @Test
+       public void testRegisterNewCompatibleAsIsSerializer() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               // register compatible serializer for state
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       testProvider.registerNewSerializerForRestoredState(new 
TestType.V1TestTypeSerializer());
+               assertTrue(schemaCompatibility.isCompatibleAsIs());
+
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+               assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void testRegisterNewCompatibleAfterMigrationSerializer() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               // register serializer that requires migration for state
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+               assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+       }
+
+       @Test
+       public void testRegisterIncompatibleSerializer() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               // register serializer that requires migration for state
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       testProvider.registerNewSerializerForRestoredState(new 
TestType.IncompatibleTestTypeSerializer());
+               assertTrue(schemaCompatibility.isIncompatible());
+
+               try {
+                       // a serializer for the current schema will no longer 
be accessible
+                       testProvider.currentSchemaSerializer();
+               } catch (Exception excepted) {
+                       // success
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------
+       //  Utilities
+       // 
--------------------------------------------------------------------------------
+
+       public static class ExceptionThrowingSerializerSnapshot implements 
TypeSerializerSnapshot<String> {
+
+               @Override
+               public TypeSerializer<String> restoreSerializer() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void writeSnapshot(DataOutputView out) throws 
IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public TypeSerializerSchemaCompatibility<String> 
resolveSchemaCompatibility(TypeSerializer<String> newSerializer) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public int getCurrentVersion() {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
new file mode 100644
index 00000000000..e3b0a066e51
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils.statemigration;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.junit.Assert;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A data type used as state in state migration tests.
+ *
+ * <p>This is implemented so that the type can also be used as keyed priority 
queue state.
+ */
+public class TestType implements HeapPriorityQueueElement, 
PriorityComparable<TestType>, Keyed<String> {
+
+    private int index;
+
+    private final int value;
+    private final String key;
+
+    public TestType(String key, int value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public String getKey() {
+        return key;
+    }
+
+       public int getValue() {
+               return value;
+       }
+
+       @Override
+    public int comparePriorityTo(@Nonnull TestType other) {
+        return Integer.compare(value, other.value);
+    }
+
+    @Override
+    public int getInternalIndex() {
+        return index;
+    }
+
+    @Override
+    public void setInternalIndex(int newIndex) {
+        this.index = newIndex;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || !(obj instanceof TestType)) {
+            return false;
+        }
+
+        if (obj == this) {
+            return true;
+        }
+
+        TestType other = (TestType) obj;
+        return Objects.equals(key, other.key) && value == other.value;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * key.hashCode() + value;
+    }
+
+       /**
+        * A serializer that read / writes {@link TestType} in schema version 1.
+        */
+       public static class V1TestTypeSerializer extends TestTypeSerializerBase 
{
+               private static final long serialVersionUID = 
5053346160938769779L;
+
+               @Override
+               public void serialize(TestType record, DataOutputView target) 
throws IOException {
+                       target.writeUTF(record.getKey());
+                       target.writeInt(record.getValue());
+               }
+
+               @Override
+               public TestType deserialize(DataInputView source) throws 
IOException {
+                       return new TestType(source.readUTF(), source.readInt());
+               }
+
+               @Override
+               public TypeSerializerSnapshot<TestType> snapshotConfiguration() 
{
+                       return new V1TestTypeSerializerSnapshot();
+               }
+       }
+
+       /**
+        * A serializer that read / writes {@link TestType} in schema version 2.
+        * Migration is required if the state was previously written with 
{@link V1TestTypeSerializer}.
+        */
+       public static class V2TestTypeSerializer extends TestTypeSerializerBase 
{
+
+               private static final long serialVersionUID = 
7199590310936186578L;
+
+               private static final String RANDOM_PAYLOAD = "random-payload";
+
+               @Override
+               public void serialize(TestType record, DataOutputView target) 
throws IOException {
+                       target.writeUTF(record.getKey());
+                       target.writeUTF(RANDOM_PAYLOAD);
+                       target.writeInt(record.getValue());
+                       target.writeBoolean(true);
+               }
+
+               @Override
+               public TestType deserialize(DataInputView source) throws 
IOException {
+                       String key = source.readUTF();
+                       Assert.assertEquals(RANDOM_PAYLOAD, source.readUTF());
+                       int value = source.readInt();
+                       Assert.assertTrue(source.readBoolean());
+
+                       return new TestType(key, value);
+               }
+
+               @Override
+               public TypeSerializerSnapshot<TestType> snapshotConfiguration() 
{
+                       return new V1TestTypeSerializerSnapshot();
+               }
+       }
+
+       /**
+        * A serializer that is meant to be incompatible with any of the 
serializers.
+        */
+       public static class IncompatibleTestTypeSerializer extends 
TestTypeSerializerBase {
+
+               private static final long serialVersionUID = 
-2959080770523247215L;
+
+               @Override
+               public void serialize(TestType record, DataOutputView target) 
throws IOException {
+                       throw new UnsupportedOperationException("This is an 
incompatible serializer; shouldn't be used.");
+               }
+
+               @Override
+               public TestType deserialize(DataInputView source) throws 
IOException {
+                       throw new UnsupportedOperationException("This is an 
incompatible serializer; shouldn't be used.");
+               }
+
+               @Override
+               public TypeSerializerSnapshot<TestType> snapshotConfiguration() 
{
+                       throw new UnsupportedOperationException("This is an 
incompatible serializer; shouldn't be used.");
+               }
+       }
+
+       public static abstract class TestTypeSerializerBase extends 
TypeSerializer<TestType> {
+
+               private static final long serialVersionUID = 
256299937766275871L;
+
+               // 
--------------------------------------------------------------------------------
+               //  Miscellaneous serializer methods
+               // 
--------------------------------------------------------------------------------
+
+               @Override
+               public TestType copy(TestType from) {
+                       return new TestType(from.getKey(), from.getValue());
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       serialize(deserialize(source), target);
+               }
+
+               @Override
+               public TestType deserialize(TestType reuse, DataInputView 
source) throws IOException {
+                       return deserialize(source);
+               }
+
+               @Override
+               public TestType copy(TestType from, TestType reuse) {
+                       return copy(from);
+               }
+
+               @Override
+               public TestType createInstance() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public TypeSerializer<TestType> duplicate() {
+                       return this;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public int getLength() {
+                       return -1;
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return getClass().equals(obj.getClass());
+               }
+
+               @Override
+               public int hashCode() {
+                       return getClass().hashCode();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj == this;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
new file mode 100644
index 00000000000..b2b802a30c2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils.statemigration;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Snapshot class for {@link TestType.V1TestTypeSerializer}.
+ */
+public class V1TestTypeSerializerSnapshot implements 
TypeSerializerSnapshot<TestType> {
+
+       @Override
+       public int getCurrentVersion() {
+               return 1;
+       }
+
+       @Override
+       public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(TypeSerializer<TestType> newSerializer) {
+               if (newSerializer instanceof TestType.V1TestTypeSerializer) {
+                       return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
+               } else if (newSerializer instanceof 
TestType.V2TestTypeSerializer) {
+                       return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+               } else {
+                       return TypeSerializerSchemaCompatibility.incompatible();
+               }
+       }
+
+       @Override
+       public TypeSerializer<TestType> restoreSerializer() {
+               return new TestType.V1TestTypeSerializer();
+       }
+
+       @Override
+    public void writeSnapshot(DataOutputView out) throws IOException {
+       }
+
+    @Override
+    public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
new file mode 100644
index 00000000000..3cd4fff8d86
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils.statemigration;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Snapshot class for {@link TestType.V2TestTypeSerializer}.
+ */
+public class V2TestTypeSerializerSnapshot implements 
TypeSerializerSnapshot<TestType> {
+
+       @Override
+       public int getCurrentVersion() {
+               return 1;
+       }
+
+       @Override
+       public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(TypeSerializer<TestType> newSerializer) {
+               if (newSerializer instanceof TestType.V2TestTypeSerializer) {
+                       return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
+               } else {
+                       return TypeSerializerSchemaCompatibility.incompatible();
+               }
+       }
+
+       @Override
+       public TypeSerializer<TestType> restoreSerializer() {
+               return new TestType.V2TestTypeSerializer();
+       }
+
+       @Override
+       public void writeSnapshot(DataOutputView out) throws IOException {
+       }
+
+       @Override
+       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 50caa0d912a..a37f8aa8df8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -29,7 +29,6 @@
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -112,7 +111,6 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -213,14 +211,6 @@
         */
        private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation;
 
-       /**
-        * Map of state names to their corresponding restored state meta info.
-        *
-        * <p>TODO this map can be removed when eager-state registration is in 
place.
-        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
-        */
-       private final Map<String, StateMetaInfoSnapshot> 
restoredKvStateMetaInfos;
-
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
 
@@ -297,7 +287,6 @@ public RocksDBKeyedStateBackend(
                this.keyGroupPrefixBytes =
                        
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups());
                this.kvStateInformation = new LinkedHashMap<>();
-               this.restoredKvStateMetaInfos = new HashMap<>();
 
                this.writeOptions = new WriteOptions().setDisableWAL(true);
 
@@ -425,7 +414,6 @@ public void dispose() {
                        IOUtils.closeQuietly(dbOptions);
                        IOUtils.closeQuietly(writeOptions);
                        kvStateInformation.clear();
-                       restoredKvStateMetaInfos.clear();
 
                        cleanInstanceBasePath();
                }
@@ -511,7 +499,6 @@ public void restore(Collection<KeyedStateHandle> 
restoreState) throws Exception
 
                // clear all meta data
                kvStateInformation.clear();
-               restoredKvStateMetaInfos.clear();
 
                try {
                        RocksDBIncrementalRestoreOperation<K> 
incrementalRestoreOperation = null;
@@ -754,11 +741,13 @@ private void restoreKVStateMetaData() throws IOException, 
StateMigrationExceptio
                                                nameBytes,
                                                
rocksDBKeyedStateBackend.columnOptions);
 
-                                       
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
-
                                        ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-                                       registeredColumn = new 
Tuple2<>(columnFamily, null);
+                                       // create a meta info for the state on 
restore;
+                                       // this allows us to retain the state 
in future snapshots even if it wasn't accessed
+                                       RegisteredStateMetaInfoBase 
stateMetaInfo =
+                                               
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
+                                       registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
                                        
rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(), 
registeredColumn);
 
                                } else {
@@ -1069,10 +1058,14 @@ private ColumnFamilyHandle 
getOrRegisterColumnFamilyHandle(
                                
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
                        if (null == registeredStateMetaInfoEntry) {
+                               // create a meta info for the state on restore;
+                               // this allows us to retain the state in future 
snapshots even if it wasn't accessed
+                               RegisteredStateMetaInfoBase stateMetaInfo =
+                                       
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
                                registeredStateMetaInfoEntry =
                                        new Tuple2<>(
                                                columnFamilyHandle != null ? 
columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
-                                               null);
+                                               stateMetaInfo);
 
                                stateBackend.registerKvStateInformation(
                                        stateMetaInfoSnapshot.getName(),
@@ -1159,7 +1152,6 @@ private void initTargetDB(
                                        stateBackend.columnOptions);
 
                                
columnFamilyDescriptors.add(columnFamilyDescriptor);
-                               
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
                        }
                        return columnFamilyDescriptors;
                }
@@ -1201,9 +1193,13 @@ private void restoreLocalStateIntoFullInstance(
 
                                ColumnFamilyHandle columnFamilyHandle = 
columnFamilyHandles.get(i);
 
+                               // create a meta info for the state on restore;
+                               // this allows us to retain the state in future 
snapshots even if it wasn't accessed
+                               RegisteredStateMetaInfoBase stateMetaInfo =
+                                       
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
                                stateBackend.registerKvStateInformation(
                                        stateMetaInfoSnapshot.getName(),
-                                       new Tuple2<>(columnFamilyHandle, null));
+                                       new Tuple2<>(columnFamilyHandle, 
stateMetaInfo));
                        }
 
                        // use the restore sst files as the base for succeeding 
checkpoints
@@ -1365,78 +1361,69 @@ private void copyStateDataHandleData(
                        TypeSerializer<N> namespaceSerializer,
                        @Nullable StateSnapshotTransformer<SV> 
snapshotTransformer) throws Exception {
 
-               Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
stateInfo =
+               Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
oldStateInfo =
                        kvStateInformation.get(stateDesc.getName());
 
                TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
-               RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
-                       stateDesc.getType(),
-                       stateDesc.getName(),
-                       namespaceSerializer,
-                       stateSerializer,
-                       snapshotTransformer);
-
-               if (stateInfo != null) {
-                       newMetaInfo = migrateStateIfNecessary(
-                               newMetaInfo,
+
+               ColumnFamilyHandle newColumnFamily;
+               RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo;
+               if (oldStateInfo != null) {
+                       @SuppressWarnings("unchecked")
+                       RegisteredKeyValueStateBackendMetaInfo<N, SV> 
castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo<N, SV>) 
oldStateInfo.f1;
+
+                       newMetaInfo = updateRestoredStateMetaInfo(
+                               Tuple2.of(oldStateInfo.f0, castedMetaInfo),
                                stateDesc,
                                namespaceSerializer,
                                stateSerializer,
-                               stateInfo);
+                               snapshotTransformer);
 
-                       stateInfo.f1 = newMetaInfo;
+                       oldStateInfo.f1 = newMetaInfo;
+                       newColumnFamily = oldStateInfo.f0;
                } else {
-                       ColumnFamilyHandle columnFamily = 
createColumnFamily(stateDesc.getName());
+                       newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+                               stateDesc.getType(),
+                               stateDesc.getName(),
+                               namespaceSerializer,
+                               stateSerializer,
+                               snapshotTransformer);
 
-                       stateInfo = Tuple2.of(columnFamily, newMetaInfo);
-                       registerKvStateInformation(stateDesc.getName(), 
stateInfo);
+                       newColumnFamily = 
createColumnFamily(stateDesc.getName());
+                       registerKvStateInformation(stateDesc.getName(), 
Tuple2.of(newColumnFamily, newMetaInfo));
                }
 
-               return Tuple2.of(stateInfo.f0, newMetaInfo);
+               return Tuple2.of(newColumnFamily, newMetaInfo);
        }
 
-       private <N, S extends State, SV> 
RegisteredKeyValueStateBackendMetaInfo<N, SV> migrateStateIfNecessary(
-                       RegisteredKeyValueStateBackendMetaInfo<N, SV> 
newMetaInfo,
+       private <N, S extends State, SV> 
RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(
+                       Tuple2<ColumnFamilyHandle, 
RegisteredKeyValueStateBackendMetaInfo<N, SV>> oldStateInfo,
                        StateDescriptor<S, SV> stateDesc,
                        TypeSerializer<N> namespaceSerializer,
                        TypeSerializer<SV> stateSerializer,
-                       Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
stateInfo) throws Exception {
-
-               StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
-
-               Preconditions.checkState(
-                       restoredMetaInfoSnapshot != null,
-                       "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                               " but its corresponding restored snapshot 
cannot be found.");
+                       @Nullable StateSnapshotTransformer<SV> 
snapshotTransformer) throws Exception {
 
                @SuppressWarnings("unchecked")
-               TypeSerializerSnapshot<N> namespaceSerializerSnapshot = 
Preconditions.checkNotNull(
-                       (TypeSerializerSnapshot<N>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()));
+               RegisteredKeyValueStateBackendMetaInfo<N, SV> 
restoredKvStateMetaInfo = oldStateInfo.f1;
+
+               
restoredKvStateMetaInfo.updateSnapshotTransformer(snapshotTransformer);
 
-               TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
-                       
namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer);
-               if (!namespaceCompatibility.isCompatibleAsIs()) {
+               TypeSerializerSchemaCompatibility<N> s = 
restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
+               if (!s.isCompatibleAsIs()) {
                        throw new StateMigrationException("The new namespace 
serializer must be compatible.");
                }
 
-               @SuppressWarnings("unchecked")
-               TypeSerializerSnapshot<SV> stateSerializerSnapshot = 
Preconditions.checkNotNull(
-                       (TypeSerializerSnapshot<SV>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()));
-
-               
RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot,
 stateDesc);
-
-               TypeSerializerSchemaCompatibility<SV> stateCompatibility =
-                       
stateSerializerSnapshot.resolveSchemaCompatibility(stateSerializer);
+               restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc);
 
-               if (stateCompatibility.isCompatibleAfterMigration()) {
-                       migrateStateValues(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo, stateSerializer);
-               } else if (stateCompatibility.isIncompatible()) {
+               TypeSerializerSchemaCompatibility<SV> 
newStateSerializerCompatibility =
+                       
restoredKvStateMetaInfo.updateStateSerializer(stateSerializer);
+               if 
(newStateSerializerCompatibility.isCompatibleAfterMigration()) {
+                       migrateStateValues(stateDesc, oldStateInfo);
+               } else if (newStateSerializerCompatibility.isIncompatible()) {
                        throw new StateMigrationException("The new state 
serializer cannot be incompatible.");
                }
 
-               return newMetaInfo;
+               return restoredKvStateMetaInfo;
        }
 
        /**
@@ -1446,10 +1433,7 @@ private void copyStateDataHandleData(
         */
        private <N, S extends State, SV> void migrateStateValues(
                StateDescriptor<S, SV> stateDesc,
-               Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
stateInfo,
-               StateMetaInfoSnapshot restoredMetaInfoSnapshot,
-               RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo,
-               TypeSerializer<SV> newStateSerializer) throws Exception {
+               Tuple2<ColumnFamilyHandle, 
RegisteredKeyValueStateBackendMetaInfo<N, SV>> stateMetaInfo) throws Exception {
 
                if (stateDesc.getType() == StateDescriptor.Type.MAP) {
                        throw new StateMigrationException("The new serializer 
for a MapState requires state migration in order for the job to proceed." +
@@ -1471,7 +1455,7 @@ private void copyStateDataHandleData(
                }
                State state = stateFactory.createState(
                        stateDesc,
-                       Tuple2.of(stateInfo.f0, newMetaInfo),
+                       stateMetaInfo,
                        RocksDBKeyedStateBackend.this);
                if (!(state instanceof AbstractRocksDBState)) {
                        throw new FlinkRuntimeException(
@@ -1483,16 +1467,11 @@ private void copyStateDataHandleData(
 
                Snapshot rocksDBSnapshot = db.getSnapshot();
                try (
-                       RocksIteratorWrapper iterator = getRocksIterator(db, 
stateInfo.f0);
+                       RocksIteratorWrapper iterator = getRocksIterator(db, 
stateMetaInfo.f0);
                        RocksDBWriteBatchWrapper batchWriter = new 
RocksDBWriteBatchWrapper(db, getWriteOptions())
                ) {
                        iterator.seekToFirst();
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<SV> priorValueSerializerSnapshot 
= (TypeSerializerSnapshot<SV>)
-                               
Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-                       TypeSerializer<SV> priorValueSerializer = 
priorValueSerializerSnapshot.restoreSerializer();
-
                        DataInputDeserializer serializedValueInput = new 
DataInputDeserializer();
                        DataOutputSerializer migratedSerializedValueOutput = 
new DataOutputSerializer(512);
                        while (iterator.isValid()) {
@@ -1501,10 +1480,10 @@ private void copyStateDataHandleData(
                                rocksDBState.migrateSerializedValue(
                                        serializedValueInput,
                                        migratedSerializedValueOutput,
-                                       priorValueSerializer,
-                                       newStateSerializer);
+                                       
stateMetaInfo.f1.getPreviousStateSerializer(),
+                                       stateMetaInfo.f1.getStateSerializer());
 
-                               batchWriter.put(stateInfo.f0, iterator.key(), 
migratedSerializedValueOutput.getCopyOfBuffer());
+                               batchWriter.put(stateMetaInfo.f0, 
iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer());
 
                                migratedSerializedValueOutput.clear();
                                iterator.next();
@@ -1698,25 +1677,16 @@ public static RocksIteratorWrapper getRocksIterator(
                        // TODO we implement the simple way of supporting the 
current functionality, mimicking keyed state
                        // because this should be reworked in FLINK-9376 and 
then we should have a common algorithm over
                        // StateMetaInfoSnapshot that avoids this code 
duplication.
-                       StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateName);
-
-                       Preconditions.checkState(
-                               restoredMetaInfoSnapshot != null,
-                               "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                                       " but its corresponding restored 
snapshot cannot be found.");
 
-                       StateMetaInfoSnapshot.CommonSerializerKeys 
serializerKey =
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
-
-                       TypeSerializer<?> metaInfoTypeSerializer = 
restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey);
+                       @SuppressWarnings("unchecked")
+                       RegisteredPriorityQueueStateBackendMetaInfo<T> 
castedMetaInfo =
+                               
(RegisteredPriorityQueueStateBackendMetaInfo<T>) metaInfoTuple.f1;
 
-                       if (metaInfoTypeSerializer != 
byteOrderedElementSerializer) {
-                               @SuppressWarnings("unchecked")
-                               TypeSerializerSnapshot<T> serializerSnapshot = 
Preconditions.checkNotNull(
-                                       (TypeSerializerSnapshot<T>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+                       TypeSerializer<T> previousElementSerializer = 
castedMetaInfo.getPreviousElementSerializer();
 
+                       if (previousElementSerializer != 
byteOrderedElementSerializer) {
                                TypeSerializerSchemaCompatibility<T> 
compatibilityResult =
-                                       
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
+                                       
castedMetaInfo.updateElementSerializer(byteOrderedElementSerializer);
 
                                // Since priority queue elements are written 
into RocksDB
                                // as keys prefixed with the key group and 
namespace, we do not support


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to