This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ba346cade85511086c08c634cc34b1742d9d98c8
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Sun Dec 9 15:15:04 2018 +0800

    [FLINK-11094] [state backends] Let meta infos always lazily access restore 
serializer
    
    This commit introduces StateSerializerProvider that
    wraps logic on how to obtain serializers for registered state,
    either with the previous schema of state in checkpoints or
    the current schema of state.
    
    All state meta info subclasses use
    StateSerializerProviders to replace direct serializer instances. This
    allows meta infos that were instantiated with restored serializer
    snapshots to not eagerly access the restore serializer when restoring
    state. This needs to be avoided since when restoring from 1.6, the
    restore serializer might not be available; for RocksDB, this should be
    tolerable.
---
 .../RegisteredBroadcastStateBackendMetaInfo.java   |  84 +++++--
 .../RegisteredKeyValueStateBackendMetaInfo.java    | 115 ++++++----
 .../RegisteredOperatorStateBackendMetaInfo.java    |  46 +++-
 ...egisteredPriorityQueueStateBackendMetaInfo.java |  39 +++-
 .../runtime/state/StateSerializerProvider.java     | 245 +++++++++++++++++++++
 .../flink/runtime/state/StateBackendTestBase.java  |  51 +++++
 .../runtime/state/StateSerializerProviderTest.java | 187 ++++++++++++++++
 .../streaming/state/RocksDBKeyedStateBackend.java  | 125 +++++------
 8 files changed, 739 insertions(+), 153 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
index 70a1414..95a650e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,11 +40,11 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> 
extends RegisteredSta
 
        /** The type serializer for the keys in the map state. */
        @Nonnull
-       private final TypeSerializer<K> keySerializer;
+       private final StateSerializerProvider<K> keySerializerProvider;
 
        /** The type serializer for the values in the map state. */
        @Nonnull
-       private final TypeSerializer<V> valueSerializer;
+       private final StateSerializerProvider<V> valueSerializerProvider;
 
        public RegisteredBroadcastStateBackendMetaInfo(
                        @Nonnull final String name,
@@ -50,19 +52,19 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> 
extends RegisteredSta
                        @Nonnull final TypeSerializer<K> keySerializer,
                        @Nonnull final TypeSerializer<V> valueSerializer) {
 
-               super(name);
-               Preconditions.checkArgument(assignmentMode == 
OperatorStateHandle.Mode.BROADCAST);
-               this.assignmentMode = assignmentMode;
-               this.keySerializer = keySerializer;
-               this.valueSerializer = valueSerializer;
+               this(
+                       name,
+                       assignmentMode,
+                       StateSerializerProvider.fromNewState(keySerializer),
+                       StateSerializerProvider.fromNewState(valueSerializer));
        }
 
        public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
                this(
                        Preconditions.checkNotNull(copy).name,
                        copy.assignmentMode,
-                       copy.keySerializer.duplicate(),
-                       copy.valueSerializer.duplicate());
+                       copy.getKeySerializer().duplicate(),
+                       copy.getValueSerializer().duplicate());
        }
 
        @SuppressWarnings("unchecked")
@@ -71,10 +73,13 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> 
extends RegisteredSta
                        snapshot.getName(),
                        OperatorStateHandle.Mode.valueOf(
                                
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-                       (TypeSerializer<K>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
-                       (TypeSerializer<V>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<K>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<V>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == 
snapshot.getBackendStateType());
        }
 
@@ -86,6 +91,19 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> 
extends RegisteredSta
                return new RegisteredBroadcastStateBackendMetaInfo<>(this);
        }
 
+       private RegisteredBroadcastStateBackendMetaInfo(
+               @Nonnull final String name,
+               @Nonnull final OperatorStateHandle.Mode assignmentMode,
+               @Nonnull final StateSerializerProvider<K> keySerializerProvider,
+               @Nonnull final StateSerializerProvider<V> 
valueSerializerProvider) {
+
+               super(name);
+               Preconditions.checkArgument(assignmentMode == 
OperatorStateHandle.Mode.BROADCAST);
+               this.assignmentMode = assignmentMode;
+               this.keySerializerProvider = keySerializerProvider;
+               this.valueSerializerProvider = valueSerializerProvider;
+       }
+
        @Nonnull
        @Override
        public StateMetaInfoSnapshot snapshot() {
@@ -94,12 +112,32 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> 
extends RegisteredSta
 
        @Nonnull
        public TypeSerializer<K> getKeySerializer() {
-               return keySerializer;
+               return keySerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<K> 
updateKeySerializer(TypeSerializer<K> newKeySerializer) {
+               return 
keySerializerProvider.registerNewSerializerForRestoredState(newKeySerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<K> getPreviousKeySerializer() {
+               return keySerializerProvider.previousSchemaSerializer();
        }
 
        @Nonnull
        public TypeSerializer<V> getValueSerializer() {
-               return valueSerializer;
+               return valueSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<V> 
updateValueSerializer(TypeSerializer<V> newValueSerializer) {
+               return 
valueSerializerProvider.registerNewSerializerForRestoredState(newValueSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<V> getPreviousValueSerializer() {
+               return valueSerializerProvider.previousSchemaSerializer();
        }
 
        @Nonnull
@@ -122,16 +160,16 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, 
V> extends RegisteredSta
 
                return Objects.equals(name, other.getName())
                                && Objects.equals(assignmentMode, 
other.getAssignmentMode())
-                               && Objects.equals(keySerializer, 
other.getKeySerializer())
-                               && Objects.equals(valueSerializer, 
other.getValueSerializer());
+                               && Objects.equals(getKeySerializer(), 
other.getKeySerializer())
+                               && Objects.equals(getValueSerializer(), 
other.getValueSerializer());
        }
 
        @Override
        public int hashCode() {
                int result = name.hashCode();
                result = 31 * result + assignmentMode.hashCode();
-               result = 31 * result + keySerializer.hashCode();
-               result = 31 * result + valueSerializer.hashCode();
+               result = 31 * result + getKeySerializer().hashCode();
+               result = 31 * result + getValueSerializer().hashCode();
                return result;
        }
 
@@ -139,8 +177,8 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> 
extends RegisteredSta
        public String toString() {
                return "RegisteredBroadcastBackendStateMetaInfo{" +
                                "name='" + name + '\'' +
-                               ", keySerializer=" + keySerializer +
-                               ", valueSerializer=" + valueSerializer +
+                               ", keySerializer=" + getKeySerializer() +
+                               ", valueSerializer=" + getValueSerializer() +
                                ", assignmentMode=" + assignmentMode +
                                '}';
        }
@@ -154,8 +192,12 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> 
extends RegisteredSta
                Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshotsMap = new HashMap<>(2);
                String keySerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
                String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+
+               TypeSerializer<K> keySerializer = getKeySerializer();
                serializerMap.put(keySerializerKey, keySerializer.duplicate());
                serializerConfigSnapshotsMap.put(keySerializerKey, 
keySerializer.snapshotConfiguration());
+
+               TypeSerializer<V> valueSerializer = getValueSerializer();
                serializerMap.put(valueSerializerKey, 
valueSerializer.duplicate());
                serializerConfigSnapshotsMap.put(valueSerializerKey, 
valueSerializer.snapshotConfiguration());
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index d05f31a..ebe8e94 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
@@ -44,18 +45,24 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
        @Nonnull
        private final StateDescriptor.Type stateType;
        @Nonnull
-       private final TypeSerializer<N> namespaceSerializer;
+       private final StateSerializerProvider<N> namespaceSerializerProvider;
        @Nonnull
-       private final TypeSerializer<S> stateSerializer;
+       private final StateSerializerProvider<S> stateSerializerProvider;
        @Nullable
-       private final StateSnapshotTransformer<S> snapshotTransformer;
+       private StateSnapshotTransformer<S> snapshotTransformer;
 
        public RegisteredKeyValueStateBackendMetaInfo(
                @Nonnull StateDescriptor.Type stateType,
                @Nonnull String name,
                @Nonnull TypeSerializer<N> namespaceSerializer,
                @Nonnull TypeSerializer<S> stateSerializer) {
-               this(stateType, name, namespaceSerializer, stateSerializer, 
null);
+
+               this(
+                       stateType,
+                       name,
+                       
StateSerializerProvider.fromNewState(namespaceSerializer),
+                       StateSerializerProvider.fromNewState(stateSerializer),
+                       null);
        }
 
        public RegisteredKeyValueStateBackendMetaInfo(
@@ -65,11 +72,12 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
                @Nonnull TypeSerializer<S> stateSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) {
 
-               super(name);
-               this.stateType = stateType;
-               this.namespaceSerializer = namespaceSerializer;
-               this.stateSerializer = stateSerializer;
-               this.snapshotTransformer = snapshotTransformer;
+               this(
+                       stateType,
+                       name,
+                       
StateSerializerProvider.fromNewState(namespaceSerializer),
+                       StateSerializerProvider.fromNewState(stateSerializer),
+                       snapshotTransformer);
        }
 
        @SuppressWarnings("unchecked")
@@ -77,13 +85,31 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
                this(
                        
StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
                        snapshot.getName(),
-                       (TypeSerializer<N>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
-                       (TypeSerializer<S>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
 null);
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<N>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<S>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
+                       null);
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == 
snapshot.getBackendStateType());
        }
 
+       private RegisteredKeyValueStateBackendMetaInfo(
+               @Nonnull StateDescriptor.Type stateType,
+               @Nonnull String name,
+               @Nonnull StateSerializerProvider<N> namespaceSerializerProvider,
+               @Nonnull StateSerializerProvider<S> stateSerializerProvider,
+               @Nullable StateSnapshotTransformer<S> snapshotTransformer) {
+
+               super(name);
+               this.stateType = stateType;
+               this.namespaceSerializerProvider = namespaceSerializerProvider;
+               this.stateSerializerProvider = stateSerializerProvider;
+               this.snapshotTransformer = snapshotTransformer;
+       }
+
        @Nonnull
        public StateDescriptor.Type getStateType() {
                return stateType;
@@ -91,12 +117,32 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
 
        @Nonnull
        public TypeSerializer<N> getNamespaceSerializer() {
-               return namespaceSerializer;
+               return namespaceSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<N> 
updateNamespaceSerializer(TypeSerializer<N> newNamespaceSerializer) {
+               return 
namespaceSerializerProvider.registerNewSerializerForRestoredState(newNamespaceSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<N> getPreviousNamespaceSerializer() {
+               return namespaceSerializerProvider.previousSchemaSerializer();
        }
 
        @Nonnull
        public TypeSerializer<S> getStateSerializer() {
-               return stateSerializer;
+               return stateSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<S> 
updateStateSerializer(TypeSerializer<S> newStateSerializer) {
+               return 
stateSerializerProvider.registerNewSerializerForRestoredState(newStateSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<S> getPreviousStateSerializer() {
+               return stateSerializerProvider.previousSchemaSerializer();
        }
 
        @Nullable
@@ -104,6 +150,10 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
                return snapshotTransformer;
        }
 
+       public void updateSnapshotTransformer(StateSnapshotTransformer<S> 
snapshotTransformer) {
+               this.snapshotTransformer = snapshotTransformer;
+       }
+
        @Override
        public boolean equals(Object o) {
                if (this == o) {
@@ -133,8 +183,8 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
                return "RegisteredKeyedBackendStateMetaInfo{" +
                                "stateType=" + stateType +
                                ", name='" + name + '\'' +
-                               ", namespaceSerializer=" + namespaceSerializer +
-                               ", stateSerializer=" + stateSerializer +
+                               ", namespaceSerializer=" + 
getNamespaceSerializer() +
+                               ", stateSerializer=" + getStateSerializer() +
                                '}';
        }
 
@@ -153,34 +203,19 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
                return computeSnapshot();
        }
 
-       public static void checkStateMetaInfo(StateMetaInfoSnapshot 
stateMetaInfoSnapshot, StateDescriptor<?, ?> stateDesc) {
-               Preconditions.checkState(
-                       stateMetaInfoSnapshot != null,
-                       "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                               " but its corresponding restored snapshot 
cannot be found.");
-
-               
Preconditions.checkState(stateMetaInfoSnapshot.getBackendStateType()
-                               == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
-                       "Incompatible state types. " +
-                               "Was [" + 
stateMetaInfoSnapshot.getBackendStateType() + "], " +
-                               "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+       public void checkStateMetaInfo(StateDescriptor<?, ?> stateDesc) {
 
                Preconditions.checkState(
-                       Objects.equals(stateDesc.getName(), 
stateMetaInfoSnapshot.getName()),
+                       Objects.equals(stateDesc.getName(), getName()),
                        "Incompatible state names. " +
-                               "Was [" + stateMetaInfoSnapshot.getName() + "], 
" +
+                               "Was [" + getName() + "], " +
                                "registered with [" + stateDesc.getName() + 
"].");
 
-               final StateDescriptor.Type restoredType =
-                       StateDescriptor.Type.valueOf(
-                               stateMetaInfoSnapshot.getOption(
-                                       
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
-
-               if (stateDesc.getType() != StateDescriptor.Type.UNKNOWN && 
restoredType != StateDescriptor.Type.UNKNOWN) {
+               if (stateDesc.getType() != StateDescriptor.Type.UNKNOWN && 
getStateType() != StateDescriptor.Type.UNKNOWN) {
                        Preconditions.checkState(
-                               stateDesc.getType() == restoredType,
+                               stateDesc.getType() == getStateType(),
                                "Incompatible key/value state types. " +
-                                       "Was [" + restoredType + "], " +
+                                       "Was [" + getStateType() + "], " +
                                        "registered with [" + 
stateDesc.getType() + "].");
                }
        }
@@ -194,8 +229,12 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> 
extends RegisteredStat
                Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshotsMap = new HashMap<>(2);
                String namespaceSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
                String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+
+               TypeSerializer<N> namespaceSerializer = 
getNamespaceSerializer();
                serializerMap.put(namespaceSerializerKey, 
namespaceSerializer.duplicate());
                serializerConfigSnapshotsMap.put(namespaceSerializerKey, 
namespaceSerializer.snapshotConfiguration());
+
+               TypeSerializer<S> stateSerializer = getStateSerializer();
                serializerMap.put(valueSerializerKey, 
stateSerializer.duplicate());
                serializerConfigSnapshotsMap.put(valueSerializerKey, 
stateSerializer.snapshotConfiguration());
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
index 10ba029..afb3d77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.Map;
@@ -46,21 +48,22 @@ public class RegisteredOperatorStateBackendMetaInfo<S> 
extends RegisteredStateMe
         * The type serializer for the elements in the state list
         */
        @Nonnull
-       private final TypeSerializer<S> partitionStateSerializer;
+       private final StateSerializerProvider<S> 
partitionStateSerializerProvider;
 
        public RegisteredOperatorStateBackendMetaInfo(
                        @Nonnull String name,
                        @Nonnull TypeSerializer<S> partitionStateSerializer,
                        @Nonnull OperatorStateHandle.Mode assignmentMode) {
-               super(name);
-               this.partitionStateSerializer = partitionStateSerializer;
-               this.assignmentMode = assignmentMode;
+               this(
+                       name,
+                       
StateSerializerProvider.fromNewState(partitionStateSerializer),
+                       assignmentMode);
        }
 
        private RegisteredOperatorStateBackendMetaInfo(@Nonnull 
RegisteredOperatorStateBackendMetaInfo<S> copy) {
                this(
                        Preconditions.checkNotNull(copy).name,
-                       copy.partitionStateSerializer.duplicate(),
+                       copy.getPartitionStateSerializer().duplicate(),
                        copy.assignmentMode);
        }
 
@@ -68,13 +71,24 @@ public class RegisteredOperatorStateBackendMetaInfo<S> 
extends RegisteredStateMe
        public RegisteredOperatorStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
                this(
                        snapshot.getName(),
-                       (TypeSerializer<S>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<S>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
                        OperatorStateHandle.Mode.valueOf(
                                
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == 
snapshot.getBackendStateType());
        }
 
+       private RegisteredOperatorStateBackendMetaInfo(
+                       @Nonnull String name,
+                       @Nonnull StateSerializerProvider<S> 
partitionStateSerializerProvider,
+                       @Nonnull OperatorStateHandle.Mode assignmentMode) {
+               super(name);
+               this.partitionStateSerializerProvider = 
partitionStateSerializerProvider;
+               this.assignmentMode = assignmentMode;
+       }
+
        /**
         * Creates a deep copy of the itself.
         */
@@ -96,7 +110,17 @@ public class RegisteredOperatorStateBackendMetaInfo<S> 
extends RegisteredStateMe
 
        @Nonnull
        public TypeSerializer<S> getPartitionStateSerializer() {
-               return partitionStateSerializer;
+               return 
partitionStateSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<S> 
updatePartitionStateSerializer(TypeSerializer<S> newPartitionStateSerializer) {
+               return 
partitionStateSerializerProvider.registerNewSerializerForRestoredState(newPartitionStateSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<S> getPreviousPartitionStateSerializer() {
+               return 
partitionStateSerializerProvider.previousSchemaSerializer();
        }
 
        @Override
@@ -112,7 +136,7 @@ public class RegisteredOperatorStateBackendMetaInfo<S> 
extends RegisteredStateMe
                return (obj instanceof RegisteredOperatorStateBackendMetaInfo)
                        && 
name.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getName())
                        && 
assignmentMode.equals(((RegisteredOperatorStateBackendMetaInfo) 
obj).getAssignmentMode())
-                       && 
partitionStateSerializer.equals(((RegisteredOperatorStateBackendMetaInfo) 
obj).getPartitionStateSerializer());
+                       && 
getPartitionStateSerializer().equals(((RegisteredOperatorStateBackendMetaInfo) 
obj).getPartitionStateSerializer());
        }
 
        @Override
@@ -128,7 +152,7 @@ public class RegisteredOperatorStateBackendMetaInfo<S> 
extends RegisteredStateMe
                return "RegisteredOperatorBackendStateMetaInfo{" +
                        "name='" + name + "\'" +
                        ", assignmentMode=" + assignmentMode +
-                       ", partitionStateSerializer=" + 
partitionStateSerializer +
+                       ", partitionStateSerializer=" + 
getPartitionStateSerializer() +
                        '}';
        }
 
@@ -138,6 +162,8 @@ public class RegisteredOperatorStateBackendMetaInfo<S> 
extends RegisteredStateMe
                        
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
                        assignmentMode.toString());
                String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+
+               TypeSerializer<S> partitionStateSerializer = 
getPartitionStateSerializer();
                Map<String, TypeSerializer<?>> serializerMap =
                        Collections.singletonMap(valueSerializerKey, 
partitionStateSerializer.duplicate());
                Map<String, TypeSerializerSnapshot<?>> 
serializerConfigSnapshotsMap =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
index 0304b92..60c88e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.Map;
@@ -34,24 +36,34 @@ import java.util.Map;
 public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends 
RegisteredStateMetaInfoBase {
 
        @Nonnull
-       private final TypeSerializer<T> elementSerializer;
+       private final StateSerializerProvider<T> elementSerializerProvider;
 
        public RegisteredPriorityQueueStateBackendMetaInfo(
                @Nonnull String name,
                @Nonnull TypeSerializer<T> elementSerializer) {
 
-               super(name);
-               this.elementSerializer = elementSerializer;
+               this(name, 
StateSerializerProvider.fromNewState(elementSerializer));
        }
 
        @SuppressWarnings("unchecked")
        public 
RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
-               this(snapshot.getName(),
-                       (TypeSerializer<T>) Preconditions.checkNotNull(
-                               
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+               this(
+                       snapshot.getName(),
+                       StateSerializerProvider.fromRestoredState(
+                               (TypeSerializerSnapshot<T>) 
Preconditions.checkNotNull(
+                                       
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
+
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE 
== snapshot.getBackendStateType());
        }
 
+       private RegisteredPriorityQueueStateBackendMetaInfo(
+               @Nonnull String name,
+               @Nonnull StateSerializerProvider<T> elementSerializerProvider) {
+
+               super(name);
+               this.elementSerializerProvider = elementSerializerProvider;
+       }
+
        @Nonnull
        @Override
        public StateMetaInfoSnapshot snapshot() {
@@ -60,10 +72,21 @@ public class RegisteredPriorityQueueStateBackendMetaInfo<T> 
extends RegisteredSt
 
        @Nonnull
        public TypeSerializer<T> getElementSerializer() {
-               return elementSerializer;
+               return elementSerializerProvider.currentSchemaSerializer();
+       }
+
+       @Nonnull
+       public TypeSerializerSchemaCompatibility<T> 
updateElementSerializer(TypeSerializer<T> newElementSerializer) {
+               return 
elementSerializerProvider.registerNewSerializerForRestoredState(newElementSerializer);
+       }
+
+       @Nullable
+       public TypeSerializer<T> getPreviousElementSerializer() {
+               return elementSerializerProvider.previousSchemaSerializer();
        }
 
        private StateMetaInfoSnapshot computeSnapshot() {
+               TypeSerializer<T> elementSerializer = getElementSerializer();
                Map<String, TypeSerializer<?>> serializerMap =
                        Collections.singletonMap(
                                
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
@@ -82,6 +105,6 @@ public class RegisteredPriorityQueueStateBackendMetaInfo<T> 
extends RegisteredSt
        }
 
        public RegisteredPriorityQueueStateBackendMetaInfo deepCopy() {
-               return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, 
elementSerializer.duplicate());
+               return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, 
getElementSerializer().duplicate());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
new file mode 100644
index 0000000..a24f12e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StateSerializerProvider} wraps logic on how to obtain serializers 
for registered state,
+ * either with the previous schema of state in checkpoints or the current 
schema of state.
+ *
+ * @param <T> the type of the state.
+ */
+@Internal
+public abstract class StateSerializerProvider<T> {
+
+       /**
+        * The registered serializer for the state.
+        *
+        * <p>In the case that this provider was created from a restored 
serializer snapshot via
+        * {@link #fromRestoredState(TypeSerializerSnapshot)}, but a new 
serializer was never registered
+        * for the state (i.e., this is the case if a restored state was never 
accessed), this would be {@code null}.
+        */
+       @Nullable
+       TypeSerializer<T> registeredSerializer;
+
+       /**
+        * Creates a {@link StateSerializerProvider} for restored state from 
the previous serializer's snapshot.
+        *
+        * <p>Once a new serializer is registered for the state, it should be 
provided via
+        * the {@link #registerNewSerializerForRestoredState(TypeSerializer)} 
method.
+        *
+        * @param stateSerializerSnapshot the previous serializer's snapshot.
+        * @param <T> the type of the state.
+        *
+        * @return a new {@link StateSerializerProvider} for restored state.
+        */
+       public static <T> StateSerializerProvider<T> 
fromRestoredState(TypeSerializerSnapshot<T> stateSerializerSnapshot) {
+               return new 
RestoredStateSerializerProvider<>(stateSerializerSnapshot);
+       }
+
+       /**
+        * Creates a {@link StateSerializerProvider} for new state from the 
registered state serializer.
+        *
+        * @param registeredStateSerializer the new state's registered 
serializer.
+        * @param <T> the type of the state.
+        *
+        * @return a new {@link StateSerializerProvider} for new state.
+        */
+       public static <T> StateSerializerProvider<T> 
fromNewState(TypeSerializer<T> registeredStateSerializer) {
+               return new 
NewStateSerializerProvider<>(registeredStateSerializer);
+       }
+
+       private StateSerializerProvider(@Nullable TypeSerializer<T> 
stateSerializer) {
+               this.registeredSerializer = stateSerializer;
+       }
+
+       /**
+        * Gets the serializer that recognizes the current serialization schema 
of the state.
+        * This is the serializer that should be used for regular state 
serialization and
+        * deserialization after state has been restored.
+        *
+        * <p>If this provider was created from a restored state's serializer 
snapshot, while a
+        * new serializer (with a new schema) was not registered for the state 
(i.e., because
+        * the state was never accessed after it was restored), then the schema 
of state remains
+        * identical. Therefore, in this case, it is guaranteed that the 
serializer returned by
+        * this method is the same as the one returned by {@link 
#previousSchemaSerializer()}.
+        *
+        * <p>If this provider was created from new state, then this always 
returns the
+        * serializer that the new state was registered with.
+        *
+        * @return a serializer that reads and writes in the current schema of 
the state.
+        */
+       @Nonnull
+       public abstract TypeSerializer<T> currentSchemaSerializer();
+
+       /**
+        * Gets the serializer that recognizes the previous serialization 
schema of the state.
+        * This is the serializer that should be used for restoring the state, 
i.e. when the state
+        * is still in the previous serialization schema.
+        *
+        * <p>This method can only be used if this provider was created from a 
restored state's serializer
+        * snapshot. If this provider was created from new state, then this 
method is
+        * irrelevant, since there doesn't exist any previous version of the 
state schema.
+        *
+        * @return a serializer that reads and writes in the previous schema of 
the state.
+        */
+       @Nonnull
+       public abstract TypeSerializer<T> previousSchemaSerializer();
+
+       /**
+        * For restored state, register a new serializer that potentially has a 
new serialization schema.
+        *
+        * <p>Users are allowed to register serializers for state only once. 
Therefore, this method
+        * is irrelevant if this provider was created from new state, since a 
state serializer had
+        * been registered already.
+        *
+        * <p>For the case where this provider was created from restored state, 
then this method should
+        * be called at most once. The new serializer will be checked for its 
schema compatibility with the
+        * previous serializer's schema, and returned to the caller. The caller 
is responsible for
+        * checking the result and react appropriately to it, as follows:
+        * <ul>
+        *     <li>{@link 
TypeSerializerSchemaCompatibility#isCompatibleAsIs()}: nothing needs to be done.
+        *     {@link #currentSchemaSerializer()} now returns the newly 
registered serializer.</li>
+        *     <li>{@link 
TypeSerializerSchemaCompatibility#isCompatibleAfterMigration()} ()}: state 
needs to be
+        *     migrated before the serializer returned by {@link 
#currentSchemaSerializer()} can be used.
+        *     The migration should be performed by reading the state with 
{@link #previousSchemaSerializer()},
+        *     and then writing it again with {@link 
#currentSchemaSerializer()}.</li>
+        *     <li>{@link TypeSerializerSchemaCompatibility#isIncompatible()}: 
the registered serializer is
+        *     incompatible. {@link #currentSchemaSerializer()} can no longer 
return a serializer for
+        *     the state, and therefore this provider shouldn't be used 
anymore.</li>
+        * </ul>
+        *
+        * @return the schema compatibility of the new registered serializer, 
with respect to the previous serializer.
+        */
+       @Nonnull
+       public abstract TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer);
+
+       /**
+        * Implementation of the {@link StateSerializerProvider} for the 
restored state case.
+        */
+       private static class RestoredStateSerializerProvider<T> extends 
StateSerializerProvider<T> {
+
+               /**
+                * The snapshot of the previous serializer of the state.
+                */
+               @Nonnull
+               private final TypeSerializerSnapshot<T> 
previousSerializerSnapshot;
+
+               private boolean isRegisteredWithIncompatibleSerializer = false;
+
+               RestoredStateSerializerProvider(TypeSerializerSnapshot<T> 
previousSerializerSnapshot) {
+                       super(null);
+                       this.previousSerializerSnapshot = 
Preconditions.checkNotNull(previousSerializerSnapshot);
+               }
+
+               /**
+                * The restore serializer, lazily created only when the restore 
serializer is accessed.
+                *
+                * <p>NOTE: It is important to only create this lazily, so that 
off-heap
+                * state do not fail eagerly when restoring state that has a
+                * {@link UnloadableDummyTypeSerializer} as the previous 
serializer. This should
+                * be relevant only for restores from Flink versions prior to 
1.7.x.
+                */
+               @Nullable
+               private TypeSerializer<T> cachedRestoredSerializer;
+
+               @Override
+               @Nonnull
+               public TypeSerializer<T> currentSchemaSerializer() {
+                       if (registeredSerializer != null) {
+                               checkState(
+                                       !isRegisteredWithIncompatibleSerializer,
+                                       "Unable to provide a serializer with 
the current schema, because the restored state was " +
+                                               "registered with a new 
serializer that has incompatible schema.");
+
+                                       return registeredSerializer;
+                       }
+
+                       // if we are not yet registered with a new serializer,
+                       // we can just use the restore serializer to read / 
write the state.
+                       return previousSchemaSerializer();
+               }
+
+               @Nonnull
+               public TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
+                       checkNotNull(newSerializer);
+                       if (registeredSerializer != null) {
+                               throw new UnsupportedOperationException("A 
serializer has already been registered for the state; re-registration is not 
allowed.");
+                       }
+
+                       TypeSerializerSchemaCompatibility<T> result = 
previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
+                       if (result.isIncompatible()) {
+                               this.isRegisteredWithIncompatibleSerializer = 
true;
+                       }
+                       this.registeredSerializer = newSerializer;
+                       return result;
+               }
+
+               @Nonnull
+               public final TypeSerializer<T> previousSchemaSerializer() {
+                       if (cachedRestoredSerializer != null) {
+                               return cachedRestoredSerializer;
+                       }
+
+                       this.cachedRestoredSerializer = 
previousSerializerSnapshot.restoreSerializer();
+                       return cachedRestoredSerializer;
+               }
+       }
+
+       /**
+        * Implementation of the {@link StateSerializerProvider} for the new 
state case.
+        */
+       private static class NewStateSerializerProvider<T> extends 
StateSerializerProvider<T> {
+
+               NewStateSerializerProvider(TypeSerializer<T> 
registeredStateSerializer) {
+                       
super(Preconditions.checkNotNull(registeredStateSerializer));
+               }
+
+               @Override
+               @Nonnull
+               @SuppressWarnings("ConstantConditions")
+               public TypeSerializer<T> currentSchemaSerializer() {
+                       return registeredSerializer;
+               }
+
+               @Override
+               @Nonnull
+               public TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
+                       throw new UnsupportedOperationException("A serializer 
has already been registered for the state; re-registration is not allowed.");
+               }
+
+               @Override
+               @Nonnull
+               public TypeSerializer<T> previousSchemaSerializer() {
+                       throw new UnsupportedOperationException("This is a 
NewStateSerializerProvider; you cannot get a restore serializer because there 
was no restored state.");
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 54cac1e..8708613 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -3145,6 +3145,57 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                backend.dispose();
        }
 
+       @Test
+       public void testSnapshotNonAccessedState() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               AbstractKeyedStateBackend<String> backend = 
createKeyedBackend(StringSerializer.INSTANCE);
+
+               final String stateName = "test-name";
+               try {
+                       MapStateDescriptor<Integer, String> kvId = new 
MapStateDescriptor<>(stateName, Integer.class, String.class);
+                       MapState<Integer, String> mapState = backend
+                               .getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+                       // write some state to be snapshotted
+                       backend.setCurrentKey("1");
+                       mapState.put(11, "foo");
+                       backend.setCurrentKey("2");
+                       mapState.put(8, "bar");
+                       backend.setCurrentKey("3");
+                       mapState.put(91, "hello world");
+
+                       // take a snapshot, and then restore backend with 
snapshot
+                       KeyedStateHandle snapshot = runSnapshot(
+                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                               sharedStateRegistry);
+                       backend.dispose();
+
+                       backend = 
restoreKeyedBackend(StringSerializer.INSTANCE, snapshot);
+
+                       // now take a snapshot again without accessing the state
+                       snapshot = runSnapshot(
+                               backend.snapshot(2L, 3L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                               sharedStateRegistry);
+                       backend.dispose();
+
+                       // we restore again and try to access previous state
+                       backend = 
restoreKeyedBackend(StringSerializer.INSTANCE, snapshot);
+                       mapState = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+                       backend.setCurrentKey("1");
+                       assertEquals("foo", mapState.get(11));
+                       backend.setCurrentKey("2");
+                       assertEquals("bar", mapState.get(8));
+                       backend.setCurrentKey("3");
+                       assertEquals("hello world", mapState.get(91));
+
+                       snapshot.discardState();
+               } finally {
+                       backend.dispose();
+               }
+       }
+
        /**
         * This test verifies that state is correctly assigned to key groups 
and that restore
         * restores the relevant key groups in the backend.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
new file mode 100644
index 0000000..de1f2bd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test suit for {@link StateSerializerProvider}.
+ */
+public class StateSerializerProviderTest {
+
+       // 
--------------------------------------------------------------------------------
+       //  Tests for #currentSchemaSerializer()
+       // 
--------------------------------------------------------------------------------
+
+       @Test
+       public void testCurrentSchemaSerializerForNewStateSerializerProvider() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void 
testCurrentSchemaSerializerForRestoredStateSerializerProvider() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       // 
--------------------------------------------------------------------------------
+       //  Tests for #previousSchemaSerializer()
+       // 
--------------------------------------------------------------------------------
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testPreviousSchemaSerializerForNewStateSerializerProvider() 
{
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+
+               // this should fail with an exception
+               testProvider.previousSchemaSerializer();
+       }
+
+       @Test
+       public void 
testPreviousSchemaSerializerForRestoredStateSerializerProvider() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void testLazyInstantiationOfPreviousSchemaSerializer() {
+               // create the provider with an exception throwing snapshot;
+               // this would throw an exception if the restore serializer was 
eagerly accessed
+               StateSerializerProvider<String> testProvider =
+                       StateSerializerProvider.fromRestoredState(new 
ExceptionThrowingSerializerSnapshot());
+
+               try {
+                       // if we fail here, that means the restore serializer 
was indeed lazily accessed
+                       testProvider.previousSchemaSerializer();
+                       fail("expected to fail when accessing the restore 
serializer.");
+               } catch (Exception expected) {
+                       // success
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------
+       //  Tests for #registerNewSerializerForRestoredState(TypeSerializer)
+       // 
--------------------------------------------------------------------------------
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void 
testRegisterNewSerializerWithNewStateSerializerProviderShouldFail() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+               testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void 
testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFail() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+
+               // second registration should fail
+               testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+       }
+
+       @Test
+       public void testRegisterNewCompatibleAsIsSerializer() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               // register compatible serializer for state
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       testProvider.registerNewSerializerForRestoredState(new 
TestType.V1TestTypeSerializer());
+               assertTrue(schemaCompatibility.isCompatibleAsIs());
+
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+               assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void testRegisterNewCompatibleAfterMigrationSerializer() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               // register serializer that requires migration for state
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
+               assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+       }
+
+       @Test
+       public void testRegisterIncompatibleSerializer() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+
+               // register serializer that requires migration for state
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       testProvider.registerNewSerializerForRestoredState(new 
TestType.IncompatibleTestTypeSerializer());
+               assertTrue(schemaCompatibility.isIncompatible());
+
+               try {
+                       // a serializer for the current schema will no longer 
be accessible
+                       testProvider.currentSchemaSerializer();
+               } catch (Exception excepted) {
+                       // success
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------
+       //  Utilities
+       // 
--------------------------------------------------------------------------------
+
+       public static class ExceptionThrowingSerializerSnapshot implements 
TypeSerializerSnapshot<String> {
+
+               @Override
+               public TypeSerializer<String> restoreSerializer() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void writeSnapshot(DataOutputView out) throws 
IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public TypeSerializerSchemaCompatibility<String> 
resolveSchemaCompatibility(TypeSerializer<String> newSerializer) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public int getCurrentVersion() {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d12370b..065213b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -1377,78 +1376,69 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        TypeSerializer<N> namespaceSerializer,
                        @Nullable StateSnapshotTransformer<SV> 
snapshotTransformer) throws Exception {
 
-               Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
stateInfo =
+               Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
oldStateInfo =
                        kvStateInformation.get(stateDesc.getName());
 
                TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
-               RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
-                       stateDesc.getType(),
-                       stateDesc.getName(),
-                       namespaceSerializer,
-                       stateSerializer,
-                       snapshotTransformer);
-
-               if (stateInfo != null) {
-                       newMetaInfo = migrateStateIfNecessary(
-                               newMetaInfo,
+
+               ColumnFamilyHandle newColumnFamily;
+               RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo;
+               if (oldStateInfo != null) {
+                       @SuppressWarnings("unchecked")
+                       RegisteredKeyValueStateBackendMetaInfo<N, SV> 
castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo<N, SV>) 
oldStateInfo.f1;
+
+                       newMetaInfo = updateRestoredStateMetaInfo(
+                               Tuple2.of(oldStateInfo.f0, castedMetaInfo),
                                stateDesc,
                                namespaceSerializer,
                                stateSerializer,
-                               stateInfo);
+                               snapshotTransformer);
 
-                       stateInfo.f1 = newMetaInfo;
+                       oldStateInfo.f1 = newMetaInfo;
+                       newColumnFamily = oldStateInfo.f0;
                } else {
-                       ColumnFamilyHandle columnFamily = 
createColumnFamily(stateDesc.getName());
+                       newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+                               stateDesc.getType(),
+                               stateDesc.getName(),
+                               namespaceSerializer,
+                               stateSerializer,
+                               snapshotTransformer);
 
-                       stateInfo = Tuple2.of(columnFamily, newMetaInfo);
-                       registerKvStateInformation(stateDesc.getName(), 
stateInfo);
+                       newColumnFamily = 
createColumnFamily(stateDesc.getName());
+                       registerKvStateInformation(stateDesc.getName(), 
Tuple2.of(newColumnFamily, newMetaInfo));
                }
 
-               return Tuple2.of(stateInfo.f0, newMetaInfo);
+               return Tuple2.of(newColumnFamily, newMetaInfo);
        }
 
-       private <N, S extends State, SV> 
RegisteredKeyValueStateBackendMetaInfo<N, SV> migrateStateIfNecessary(
-                       RegisteredKeyValueStateBackendMetaInfo<N, SV> 
newMetaInfo,
+       private <N, S extends State, SV> 
RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(
+                       Tuple2<ColumnFamilyHandle, 
RegisteredKeyValueStateBackendMetaInfo<N, SV>> oldStateInfo,
                        StateDescriptor<S, SV> stateDesc,
                        TypeSerializer<N> namespaceSerializer,
                        TypeSerializer<SV> stateSerializer,
-                       Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
stateInfo) throws Exception {
-
-               StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
-
-               Preconditions.checkState(
-                       restoredMetaInfoSnapshot != null,
-                       "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                               " but its corresponding restored snapshot 
cannot be found.");
+                       @Nullable StateSnapshotTransformer<SV> 
snapshotTransformer) throws Exception {
 
                @SuppressWarnings("unchecked")
-               TypeSerializerSnapshot<N> namespaceSerializerSnapshot = 
Preconditions.checkNotNull(
-                       (TypeSerializerSnapshot<N>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()));
+               RegisteredKeyValueStateBackendMetaInfo<N, SV> 
restoredKvStateMetaInfo = oldStateInfo.f1;
+
+               
restoredKvStateMetaInfo.updateSnapshotTransformer(snapshotTransformer);
 
-               TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
-                       
namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer);
-               if (!namespaceCompatibility.isCompatibleAsIs()) {
+               TypeSerializerSchemaCompatibility<N> s = 
restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
+               if (!s.isCompatibleAsIs()) {
                        throw new StateMigrationException("The new namespace 
serializer must be compatible.");
                }
 
-               @SuppressWarnings("unchecked")
-               TypeSerializerSnapshot<SV> stateSerializerSnapshot = 
Preconditions.checkNotNull(
-                       (TypeSerializerSnapshot<SV>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()));
-
-               
RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot,
 stateDesc);
+               restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc);
 
-               TypeSerializerSchemaCompatibility<SV> stateCompatibility =
-                       
stateSerializerSnapshot.resolveSchemaCompatibility(stateSerializer);
-
-               if (stateCompatibility.isCompatibleAfterMigration()) {
-                       migrateStateValues(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo, stateSerializer);
-               } else if (stateCompatibility.isIncompatible()) {
+               TypeSerializerSchemaCompatibility<SV> 
newStateSerializerCompatibility =
+                       
restoredKvStateMetaInfo.updateStateSerializer(stateSerializer);
+               if 
(newStateSerializerCompatibility.isCompatibleAfterMigration()) {
+                       migrateStateValues(stateDesc, oldStateInfo);
+               } else if (newStateSerializerCompatibility.isIncompatible()) {
                        throw new StateMigrationException("The new state 
serializer cannot be incompatible.");
                }
 
-               return newMetaInfo;
+               return restoredKvStateMetaInfo;
        }
 
        /**
@@ -1458,10 +1448,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         */
        private <N, S extends State, SV> void migrateStateValues(
                StateDescriptor<S, SV> stateDesc,
-               Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> 
stateInfo,
-               StateMetaInfoSnapshot restoredMetaInfoSnapshot,
-               RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo,
-               TypeSerializer<SV> newStateSerializer) throws Exception {
+               Tuple2<ColumnFamilyHandle, 
RegisteredKeyValueStateBackendMetaInfo<N, SV>> stateMetaInfo) throws Exception {
 
                if (stateDesc.getType() == StateDescriptor.Type.MAP) {
                        throw new StateMigrationException("The new serializer 
for a MapState requires state migration in order for the job to proceed." +
@@ -1483,7 +1470,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
                State state = stateFactory.createState(
                        stateDesc,
-                       Tuple2.of(stateInfo.f0, newMetaInfo),
+                       stateMetaInfo,
                        RocksDBKeyedStateBackend.this);
                if (!(state instanceof AbstractRocksDBState)) {
                        throw new FlinkRuntimeException(
@@ -1495,16 +1482,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                Snapshot rocksDBSnapshot = db.getSnapshot();
                try (
-                       RocksIteratorWrapper iterator = getRocksIterator(db, 
stateInfo.f0);
+                       RocksIteratorWrapper iterator = getRocksIterator(db, 
stateMetaInfo.f0);
                        RocksDBWriteBatchWrapper batchWriter = new 
RocksDBWriteBatchWrapper(db, getWriteOptions())
                ) {
                        iterator.seekToFirst();
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<SV> priorValueSerializerSnapshot 
= (TypeSerializerSnapshot<SV>)
-                               
Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-                       TypeSerializer<SV> priorValueSerializer = 
priorValueSerializerSnapshot.restoreSerializer();
-
                        DataInputDeserializer serializedValueInput = new 
DataInputDeserializer();
                        DataOutputSerializer migratedSerializedValueOutput = 
new DataOutputSerializer(512);
                        while (iterator.isValid()) {
@@ -1513,10 +1495,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                rocksDBState.migrateSerializedValue(
                                        serializedValueInput,
                                        migratedSerializedValueOutput,
-                                       priorValueSerializer,
-                                       newStateSerializer);
+                                       
stateMetaInfo.f1.getPreviousStateSerializer(),
+                                       stateMetaInfo.f1.getStateSerializer());
 
-                               batchWriter.put(stateInfo.f0, iterator.key(), 
migratedSerializedValueOutput.getCopyOfBuffer());
+                               batchWriter.put(stateMetaInfo.f0, 
iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer());
 
                                migratedSerializedValueOutput.clear();
                                iterator.next();
@@ -1710,25 +1692,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // TODO we implement the simple way of supporting the 
current functionality, mimicking keyed state
                        // because this should be reworked in FLINK-9376 and 
then we should have a common algorithm over
                        // StateMetaInfoSnapshot that avoids this code 
duplication.
-                       StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateName);
-
-                       Preconditions.checkState(
-                               restoredMetaInfoSnapshot != null,
-                               "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                                       " but its corresponding restored 
snapshot cannot be found.");
 
-                       StateMetaInfoSnapshot.CommonSerializerKeys 
serializerKey =
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
-
-                       TypeSerializer<?> metaInfoTypeSerializer = 
restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey);
+                       @SuppressWarnings("unchecked")
+                       RegisteredPriorityQueueStateBackendMetaInfo<T> 
castedMetaInfo =
+                               
(RegisteredPriorityQueueStateBackendMetaInfo<T>) metaInfoTuple.f1;
 
-                       if (metaInfoTypeSerializer != 
byteOrderedElementSerializer) {
-                               @SuppressWarnings("unchecked")
-                               TypeSerializerSnapshot<T> serializerSnapshot = 
Preconditions.checkNotNull(
-                                       (TypeSerializerSnapshot<T>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+                       TypeSerializer<T> previousElementSerializer = 
castedMetaInfo.getPreviousElementSerializer();
 
+                       if (previousElementSerializer != 
byteOrderedElementSerializer) {
                                TypeSerializerSchemaCompatibility<T> 
compatibilityResult =
-                                       
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
+                                       
castedMetaInfo.updateElementSerializer(byteOrderedElementSerializer);
 
                                // Since priority queue elements are written 
into RocksDB
                                // as keys prefixed with the key group and 
namespace, we do not support

Reply via email to