asfgit closed pull request #7329: [FLINK-11073] (part 1) Introduce 
COMPATIBLE_WITH_RECONFIGURED_SERIALIZER option in 
TypeSerializerSchemaCompatibility
URL: https://github.com/apache/flink/pull/7329
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
index d488799cfce..4bb4aa0d8a2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
@@ -56,6 +56,12 @@
                 */
                COMPATIBLE_AFTER_MIGRATION,
 
+               /**
+                * This indicates that a reconfigured version of the new 
serializer
+                * is compatible, and should be used instead of the original 
new serializer.
+                */
+               COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+
                /**
                 * This indicates that the new serializer is incompatible, even 
with migration.
                 * This normally implies that the deserialized Java class can 
not be commonly recognized
@@ -69,6 +75,8 @@
         */
        private final Type resultType;
 
+       private final TypeSerializer<T> reconfiguredNewSerializer;
+
        /**
         * Returns a result that indicates that the new serializer is 
compatible and no migration is required.
         * The new serializer can continued to be used as is.
@@ -89,6 +97,20 @@
                return new 
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_MIGRATION, null);
        }
 
+       /**
+        * Returns a result that indicates a reconfigured version of the new 
serializer is compatible, and should be
+        * used instead of the original new serializer.
+        *
+        * @param reconfiguredSerializer the reconfigured version of the new 
serializer.
+        * @return a result that indicates a reconfigured version of the new 
serializer is compatible, and should be
+        *         used instead of the original new serializer.
+        */
+       public static <T> TypeSerializerSchemaCompatibility<T> 
compatibleWithReconfiguredSerializer(TypeSerializer<T> reconfiguredSerializer) {
+               return new TypeSerializerSchemaCompatibility<>(
+                       Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+                       Preconditions.checkNotNull(reconfiguredSerializer));
+       }
+
        /**
         * Returns a result that indicates there is no possible way for the new 
serializer to be use-able.
         * This normally indicates that there is no common Java class between 
what the previous bytes can be
@@ -105,6 +127,7 @@
 
        private TypeSerializerSchemaCompatibility(Type resultType, @Nullable 
TypeSerializer<T> reconfiguredNewSerializer) {
                this.resultType = Preconditions.checkNotNull(resultType);
+               this.reconfiguredNewSerializer = reconfiguredNewSerializer;
        }
 
        /**
@@ -125,6 +148,27 @@ public boolean isCompatibleAfterMigration() {
                return resultType == Type.COMPATIBLE_AFTER_MIGRATION;
        }
 
+       /**
+        * Returns whether or not the type of the compatibility is {@link 
Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+        *
+        * @return whether or not the type of the compatibility is {@link 
Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+        */
+       public boolean isCompatibleWithReconfiguredSerializer() {
+               return resultType == 
Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER;
+       }
+
+       /**
+        * Gets the reconfigured serializer. This throws an exception if
+        * {@link #isCompatibleWithReconfiguredSerializer()} is {@code false}.
+        */
+       public TypeSerializer<T> getReconfiguredSerializer() {
+               Preconditions.checkState(
+                       isCompatibleWithReconfiguredSerializer(),
+                       "It is only possible to get a reconfigured serializer 
if the compatibility type is %s, but the type is %s",
+                       Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER, 
resultType);
+               return reconfiguredNewSerializer;
+       }
+
        /**
         * Returns whether or not the type of the compatibility is {@link 
Type#INCOMPATIBLE}.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 19de2102a35..7422e654465 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -23,6 +23,8 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -52,8 +54,8 @@
        Closeable,
        CheckpointListener {
 
-       /** {@link TypeSerializer} for our key. */
-       protected final TypeSerializer<K> keySerializer;
+       /** {@link StateSerializerProvider} for our key serializer. */
+       private final StateSerializerProvider<K> keySerializerProvider;
 
        /** The currently active key. */
        private K currentKey;
@@ -104,7 +106,7 @@ public AbstractKeyedStateBackend(
                Preconditions.checkArgument(numberOfKeyGroups >= 
keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be 
at least the number in the key group range assigned to this backend");
 
                this.kvStateRegistry = kvStateRegistry;
-               this.keySerializer = Preconditions.checkNotNull(keySerializer);
+               this.keySerializerProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(keySerializer);
                this.numberOfKeyGroups = numberOfKeyGroups;
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
@@ -156,7 +158,13 @@ public void setCurrentKey(K newKey) {
         */
        @Override
        public TypeSerializer<K> getKeySerializer() {
-               return keySerializer;
+               return keySerializerProvider.currentSchemaSerializer();
+       }
+
+       public TypeSerializerSchemaCompatibility<K> 
checkKeySerializerSchemaCompatibility(
+                       TypeSerializerSnapshot<K> 
previousKeySerializerSnapshot) {
+
+               return 
keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(previousKeySerializerSnapshot);
        }
 
        /**
@@ -230,7 +238,7 @@ public KeyGroupRange getKeyGroupRange() {
                        final TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<S, V> stateDescriptor) throws Exception 
{
                checkNotNull(namespaceSerializer, "Namespace serializer");
-               checkNotNull(keySerializer, "State key serializer has not been 
configured in the config. " +
+               checkNotNull(keySerializerProvider, "State key serializer has 
not been configured in the config. " +
                                "This operation cannot use partitioned state.");
 
                InternalKvState<K, ?, ?> kvState = 
keyValueStatesByName.get(stateDescriptor.getName());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 585490dc75f..6de7c03e2cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -64,7 +64,7 @@
 
        // TODO the keySerializer field should be removed, once all serializers 
have the restoreSerializer() method implemented
        private TypeSerializer<K> keySerializer;
-       private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
+       private TypeSerializerSnapshot<K> keySerializerSnapshot;
 
        private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
@@ -82,7 +82,7 @@ public KeyedBackendSerializationProxy(
                this.usingKeyGroupCompression = compression;
 
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
-               this.keySerializerConfigSnapshot = 
Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
+               this.keySerializerSnapshot = 
Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
 
                Preconditions.checkNotNull(stateMetaInfoSnapshots);
                Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE);
@@ -93,8 +93,8 @@ public KeyedBackendSerializationProxy(
                return stateMetaInfoSnapshots;
        }
 
-       public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
-               return keySerializerConfigSnapshot;
+       public TypeSerializerSnapshot<K> getKeySerializerSnapshot() {
+               return keySerializerSnapshot;
        }
 
        public boolean isUsingKeyGroupCompression() {
@@ -118,7 +118,7 @@ public void write(DataOutputView out) throws IOException {
                // write the compression format used to write each key-group
                out.writeBoolean(usingKeyGroupCompression);
 
-               
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, 
keySerializerConfigSnapshot, keySerializer);
+               
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, 
keySerializerSnapshot, keySerializer);
 
                // write individual registered keyed state metainfos
                out.writeShort(stateMetaInfoSnapshots.size());
@@ -142,14 +142,14 @@ public void read(DataInputView in) throws IOException {
 
                // only starting from version 3, we have the key serializer and 
its config snapshot written
                if (readVersion >= 6) {
-                       this.keySerializerConfigSnapshot = 
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+                       this.keySerializerSnapshot = 
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
                                in, userCodeClassLoader, null);
                } else if (readVersion >= 3) {
                        Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> 
keySerializerAndConfig =
                                        
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader).get(0);
-                       this.keySerializerConfigSnapshot = 
(TypeSerializerSnapshot<K>) keySerializerAndConfig.f1;
+                       this.keySerializerSnapshot = 
(TypeSerializerSnapshot<K>) keySerializerAndConfig.f1;
                } else {
-                       this.keySerializerConfigSnapshot = new 
BackwardsCompatibleSerializerSnapshot<>(
+                       this.keySerializerSnapshot = new 
BackwardsCompatibleSerializerSnapshot<>(
                                
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, 
true));
                }
                this.keySerializer = null;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
index ecc13faa43d..e44559a2571 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -55,8 +55,8 @@ public RegisteredBroadcastStateBackendMetaInfo(
                this(
                        name,
                        assignmentMode,
-                       StateSerializerProvider.fromNewState(keySerializer),
-                       StateSerializerProvider.fromNewState(valueSerializer));
+                       
StateSerializerProvider.fromNewRegisteredSerializer(keySerializer),
+                       
StateSerializerProvider.fromNewRegisteredSerializer(valueSerializer));
        }
 
        public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
@@ -73,10 +73,10 @@ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot sn
                        snapshot.getName(),
                        OperatorStateHandle.Mode.valueOf(
                                
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-                       StateSerializerProvider.fromRestoredState(
+                       StateSerializerProvider.fromPreviousSerializerSnapshot(
                                (TypeSerializerSnapshot<K>) 
Preconditions.checkNotNull(
                                        
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))),
-                       StateSerializerProvider.fromRestoredState(
+                       StateSerializerProvider.fromPreviousSerializerSnapshot(
                                (TypeSerializerSnapshot<V>) 
Preconditions.checkNotNull(
                                        
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index b37c79de026..b2d1cdc49ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -60,8 +60,8 @@ public RegisteredKeyValueStateBackendMetaInfo(
                this(
                        stateType,
                        name,
-                       
StateSerializerProvider.fromNewState(namespaceSerializer),
-                       StateSerializerProvider.fromNewState(stateSerializer),
+                       
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
+                       
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
                        null);
        }
 
@@ -75,8 +75,8 @@ public RegisteredKeyValueStateBackendMetaInfo(
                this(
                        stateType,
                        name,
-                       
StateSerializerProvider.fromNewState(namespaceSerializer),
-                       StateSerializerProvider.fromNewState(stateSerializer),
+                       
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
+                       
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
                        snapshotTransformer);
        }
 
@@ -85,10 +85,10 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot sna
                this(
                        
StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
                        snapshot.getName(),
-                       StateSerializerProvider.fromRestoredState(
+                       StateSerializerProvider.fromPreviousSerializerSnapshot(
                                (TypeSerializerSnapshot<N>) 
Preconditions.checkNotNull(
                                        
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))),
-                       StateSerializerProvider.fromRestoredState(
+                       StateSerializerProvider.fromPreviousSerializerSnapshot(
                                (TypeSerializerSnapshot<S>) 
Preconditions.checkNotNull(
                                        
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
                        null);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
index 921947a4dd0..6b83ca2693d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -56,7 +56,7 @@ public RegisteredOperatorStateBackendMetaInfo(
                        @Nonnull OperatorStateHandle.Mode assignmentMode) {
                this(
                        name,
-                       
StateSerializerProvider.fromNewState(partitionStateSerializer),
+                       
StateSerializerProvider.fromNewRegisteredSerializer(partitionStateSerializer),
                        assignmentMode);
        }
 
@@ -71,7 +71,7 @@ private RegisteredOperatorStateBackendMetaInfo(@Nonnull 
RegisteredOperatorStateB
        public RegisteredOperatorStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
                this(
                        snapshot.getName(),
-                       StateSerializerProvider.fromRestoredState(
+                       StateSerializerProvider.fromPreviousSerializerSnapshot(
                                (TypeSerializerSnapshot<S>) 
Preconditions.checkNotNull(
                                        
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
                        OperatorStateHandle.Mode.valueOf(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
index 961d96fa405..691e74c0288 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -42,14 +42,14 @@ public RegisteredPriorityQueueStateBackendMetaInfo(
                @Nonnull String name,
                @Nonnull TypeSerializer<T> elementSerializer) {
 
-               this(name, 
StateSerializerProvider.fromNewState(elementSerializer));
+               this(name, 
StateSerializerProvider.fromNewRegisteredSerializer(elementSerializer));
        }
 
        @SuppressWarnings("unchecked")
        public 
RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
                this(
                        snapshot.getName(),
-                       StateSerializerProvider.fromRestoredState(
+                       StateSerializerProvider.fromPreviousSerializerSnapshot(
                                (TypeSerializerSnapshot<T>) 
Preconditions.checkNotNull(
                                        
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
index a24f12e42fb..bad37ff8707 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
@@ -35,6 +35,21 @@
  * A {@link StateSerializerProvider} wraps logic on how to obtain serializers 
for registered state,
  * either with the previous schema of state in checkpoints or the current 
schema of state.
  *
+ * <p>A provider can be created from either a registered state serializer, or 
the snapshot
+ * of the previous state serializer. For the former case, if the state was 
restored and a
+ * snapshot of the previous state serializer was retrieved later on, the 
snapshot can be set
+ * on the provider which also additionally checks the compatibility of the 
initially registered
+ * serializer. Similarly for the latter case, if a new state serializer is 
registered later on,
+ * it can be set on the provider, which then also checks the compatibility of 
the new registered
+ * serializer.
+ *
+ * <p>Simply put, the provider works both directions - either creating it 
first with a registered
+ * serializer or the previous serializer's snapshot, and then setting the 
previous serializer's
+ * snapshot (if the provider was created with a registered serializer) or a 
new registered state
+ * serializer (if the provider was created with a serializer snapshot). Either 
way,
+ * the new registered serializer is checked for schema compatibility once both 
the new serializer
+ * and the previous serializer snapshot is present.
+ *
  * @param <T> the type of the state.
  */
 @Internal
@@ -44,12 +59,36 @@
         * The registered serializer for the state.
         *
         * <p>In the case that this provider was created from a restored 
serializer snapshot via
-        * {@link #fromRestoredState(TypeSerializerSnapshot)}, but a new 
serializer was never registered
+        * {@link #fromPreviousSerializerSnapshot(TypeSerializerSnapshot)}, but 
a new serializer was never registered
         * for the state (i.e., this is the case if a restored state was never 
accessed), this would be {@code null}.
         */
        @Nullable
        TypeSerializer<T> registeredSerializer;
 
+       /**
+        * The state's previous serializer's snapshot.
+        *
+        * <p>In the case that this provider was created from a registered 
state serializer instance via
+        * {@link #fromNewRegisteredSerializer(TypeSerializer)}, but a 
serializer snapshot was never supplied to this
+        * provider (i.e. because the registered serializer was for a new 
state, not a restored one), this
+        * would be {@code null}.
+        */
+       @Nullable
+       TypeSerializerSnapshot<T> previousSerializerSnapshot;
+
+       /**
+        * The restore serializer, lazily created only when the restore 
serializer is accessed.
+        *
+        * <p>NOTE: It is important to only create this lazily, so that off-heap
+        * state do not fail eagerly when restoring state that has a
+        * {@link UnloadableDummyTypeSerializer} as the previous serializer. 
This should
+        * be relevant only for restores from Flink versions prior to 1.7.x.
+        */
+       @Nullable
+       private TypeSerializer<T> cachedRestoredSerializer;
+
+       private boolean isRegisteredWithIncompatibleSerializer = false;
+
        /**
         * Creates a {@link StateSerializerProvider} for restored state from 
the previous serializer's snapshot.
         *
@@ -59,26 +98,36 @@
         * @param stateSerializerSnapshot the previous serializer's snapshot.
         * @param <T> the type of the state.
         *
-        * @return a new {@link StateSerializerProvider} for restored state.
+        * @return a new {@link StateSerializerProvider}.
         */
-       public static <T> StateSerializerProvider<T> 
fromRestoredState(TypeSerializerSnapshot<T> stateSerializerSnapshot) {
-               return new 
RestoredStateSerializerProvider<>(stateSerializerSnapshot);
+       public static <T> StateSerializerProvider<T> 
fromPreviousSerializerSnapshot(TypeSerializerSnapshot<T> 
stateSerializerSnapshot) {
+               return new 
LazilyRegisteredStateSerializerProvider<>(stateSerializerSnapshot);
        }
 
        /**
-        * Creates a {@link StateSerializerProvider} for new state from the 
registered state serializer.
+        * Creates a {@link StateSerializerProvider} from the registered state 
serializer.
+        *
+        * <p>If the state is a restored one, and the previous serializer's 
snapshot is
+        * obtained later on, is should be supplied via the
+        * {@link 
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)} method.
         *
         * @param registeredStateSerializer the new state's registered 
serializer.
         * @param <T> the type of the state.
         *
-        * @return a new {@link StateSerializerProvider} for new state.
+        * @return a new {@link StateSerializerProvider}.
         */
-       public static <T> StateSerializerProvider<T> 
fromNewState(TypeSerializer<T> registeredStateSerializer) {
-               return new 
NewStateSerializerProvider<>(registeredStateSerializer);
+       public static <T> StateSerializerProvider<T> 
fromNewRegisteredSerializer(TypeSerializer<T> registeredStateSerializer) {
+               return new 
EagerlyRegisteredStateSerializerProvider<>(registeredStateSerializer);
        }
 
-       private StateSerializerProvider(@Nullable TypeSerializer<T> 
stateSerializer) {
+       private StateSerializerProvider(@Nonnull TypeSerializer<T> 
stateSerializer) {
                this.registeredSerializer = stateSerializer;
+               this.previousSerializerSnapshot = null;
+       }
+
+       private StateSerializerProvider(@Nonnull TypeSerializerSnapshot<T> 
previousSerializerSnapshot) {
+               this.previousSerializerSnapshot = previousSerializerSnapshot;
+               this.registeredSerializer = null;
        }
 
        /**
@@ -92,36 +141,63 @@ private StateSerializerProvider(@Nullable 
TypeSerializer<T> stateSerializer) {
         * identical. Therefore, in this case, it is guaranteed that the 
serializer returned by
         * this method is the same as the one returned by {@link 
#previousSchemaSerializer()}.
         *
-        * <p>If this provider was created from new state, then this always 
returns the
-        * serializer that the new state was registered with.
+        * <p>If this provider was created from a serializer instance, then 
this always returns the
+        * that same serializer instance. If later on a snapshot of the 
previous serializer is supplied
+        * via {@link 
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)}, then
+        * the initially supplied serializer instance will be checked for 
compatibility.
         *
         * @return a serializer that reads and writes in the current schema of 
the state.
         */
        @Nonnull
-       public abstract TypeSerializer<T> currentSchemaSerializer();
+       public final TypeSerializer<T> currentSchemaSerializer() {
+               if (registeredSerializer != null) {
+                       checkState(
+                               !isRegisteredWithIncompatibleSerializer,
+                               "Unable to provide a serializer with the 
current schema, because the restored state was " +
+                                       "registered with a new serializer that 
has incompatible schema.");
+
+                       return registeredSerializer;
+               }
+
+               // if we are not yet registered with a new serializer,
+               // we can just use the restore serializer to read / write the 
state.
+               return previousSchemaSerializer();
+       };
 
        /**
         * Gets the serializer that recognizes the previous serialization 
schema of the state.
         * This is the serializer that should be used for restoring the state, 
i.e. when the state
         * is still in the previous serialization schema.
         *
-        * <p>This method can only be used if this provider was created from a 
restored state's serializer
-        * snapshot. If this provider was created from new state, then this 
method is
-        * irrelevant, since there doesn't exist any previous version of the 
state schema.
+        * <p>This method only returns a serializer if this provider has the 
previous serializer's
+        * snapshot. Otherwise, trying to access the previous schema serializer 
will fail
+        * with an exception.
         *
         * @return a serializer that reads and writes in the previous schema of 
the state.
         */
        @Nonnull
-       public abstract TypeSerializer<T> previousSchemaSerializer();
+       public final TypeSerializer<T> previousSchemaSerializer() {
+               if (cachedRestoredSerializer != null) {
+                       return cachedRestoredSerializer;
+               }
+
+               if (previousSerializerSnapshot == null) {
+                       throw new UnsupportedOperationException(
+                               "This provider does not contain the state's 
previous serializer's snapshot. Cannot provider a serializer for previous 
schema.");
+               }
+
+               this.cachedRestoredSerializer = 
previousSerializerSnapshot.restoreSerializer();
+               return cachedRestoredSerializer;
+       };
 
        /**
         * For restored state, register a new serializer that potentially has a 
new serialization schema.
         *
         * <p>Users are allowed to register serializers for state only once. 
Therefore, this method
-        * is irrelevant if this provider was created from new state, since a 
state serializer had
+        * is irrelevant if this provider was created with a serializer 
instance, since a state serializer had
         * been registered already.
         *
-        * <p>For the case where this provider was created from restored state, 
then this method should
+        * <p>For the case where this provider was created from a serializer 
snapshot, then this method should
         * be called at most once. The new serializer will be checked for its 
schema compatibility with the
         * previous serializer's schema, and returned to the caller. The caller 
is responsible for
         * checking the result and react appropriately to it, as follows:
@@ -143,52 +219,61 @@ private StateSerializerProvider(@Nullable 
TypeSerializer<T> stateSerializer) {
        public abstract TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer);
 
        /**
-        * Implementation of the {@link StateSerializerProvider} for the 
restored state case.
+        * For restored state, set the state's previous serializer's snapshot.
+        *
+        * <p>Users are allowed to set the previous serializer's snapshot once. 
Therefore, this method
+        * is irrelevant if this provider was created with a serializer 
snapshot, since the serializer
+        * snapshot had been set already.
+        *
+        * <p>For the case where this provider was created from a serializer 
instance, then this method should
+        * be called at most once. The initially registered state serializer 
will be checked for its
+        * schema compatibility with the previous serializer's schema, and 
returned to the caller.
+        * The caller is responsible for checking the result and react 
appropriately to it, as follows:
+        * <ul>
+        *     <li>{@link 
TypeSerializerSchemaCompatibility#isCompatibleAsIs()}: nothing needs to be done.
+        *     {@link #currentSchemaSerializer()} remains to return the 
initially registered serializer.</li>
+        *     <li>{@link 
TypeSerializerSchemaCompatibility#isCompatibleAfterMigration()} ()}: state 
needs to be
+        *     migrated before the serializer returned by {@link 
#currentSchemaSerializer()} can be used.
+        *     The migration should be performed by reading the state with 
{@link #previousSchemaSerializer()},
+        *     and then writing it again with {@link 
#currentSchemaSerializer()}.</li>
+        *     <li>{@link TypeSerializerSchemaCompatibility#isIncompatible()}: 
the registered serializer is
+        *     incompatible. {@link #currentSchemaSerializer()} can no longer 
return a serializer for
+        *     the state, and therefore this provider shouldn't be used 
anymore.</li>
+        * </ul>
+        *
+        * @param previousSerializerSnapshot the state's previous serializer's 
snapshot
+        *
+        * @return the schema compatibility of the initially registered 
serializer, with respect to the previous serializer.
         */
-       private static class RestoredStateSerializerProvider<T> extends 
StateSerializerProvider<T> {
-
-               /**
-                * The snapshot of the previous serializer of the state.
-                */
-               @Nonnull
-               private final TypeSerializerSnapshot<T> 
previousSerializerSnapshot;
-
-               private boolean isRegisteredWithIncompatibleSerializer = false;
-
-               RestoredStateSerializerProvider(TypeSerializerSnapshot<T> 
previousSerializerSnapshot) {
-                       super(null);
-                       this.previousSerializerSnapshot = 
Preconditions.checkNotNull(previousSerializerSnapshot);
-               }
-
-               /**
-                * The restore serializer, lazily created only when the restore 
serializer is accessed.
-                *
-                * <p>NOTE: It is important to only create this lazily, so that 
off-heap
-                * state do not fail eagerly when restoring state that has a
-                * {@link UnloadableDummyTypeSerializer} as the previous 
serializer. This should
-                * be relevant only for restores from Flink versions prior to 
1.7.x.
-                */
-               @Nullable
-               private TypeSerializer<T> cachedRestoredSerializer;
+       @Nonnull
+       public abstract TypeSerializerSchemaCompatibility<T> 
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> 
previousSerializerSnapshot);
 
-               @Override
-               @Nonnull
-               public TypeSerializer<T> currentSchemaSerializer() {
-                       if (registeredSerializer != null) {
-                               checkState(
-                                       !isRegisteredWithIncompatibleSerializer,
-                                       "Unable to provide a serializer with 
the current schema, because the restored state was " +
-                                               "registered with a new 
serializer that has incompatible schema.");
+       /**
+        * Invalidates access to the current schema serializer. This lets 
{@link #currentSchemaSerializer()}
+        * fail when invoked.
+        *
+        * <p>Access to the current schema serializer should be invalidated by 
the methods
+        * {@link #registerNewSerializerForRestoredState(TypeSerializer)} or
+        * {@link 
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)}
+        * once the registered serializer is determined to be incompatible.
+        */
+       protected final void invalidateCurrentSchemaSerializerAccess() {
+               this.isRegisteredWithIncompatibleSerializer = true;
+       }
 
-                                       return registeredSerializer;
-                       }
+       /**
+        * Implementation of the {@link StateSerializerProvider} for the case 
where a snapshot of the
+        * previous serializer is obtained before a new state serializer is 
registered (hence, the naming "lazily" registered).
+        */
+       private static class LazilyRegisteredStateSerializerProvider<T> extends 
StateSerializerProvider<T> {
 
-                       // if we are not yet registered with a new serializer,
-                       // we can just use the restore serializer to read / 
write the state.
-                       return previousSchemaSerializer();
+               
LazilyRegisteredStateSerializerProvider(TypeSerializerSnapshot<T> 
previousSerializerSnapshot) {
+                       
super(Preconditions.checkNotNull(previousSerializerSnapshot));
                }
 
                @Nonnull
+               @Override
+               @SuppressWarnings("ConstantConditions")
                public TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
                        checkNotNull(newSerializer);
                        if (registeredSerializer != null) {
@@ -197,49 +282,59 @@ private StateSerializerProvider(@Nullable 
TypeSerializer<T> stateSerializer) {
 
                        TypeSerializerSchemaCompatibility<T> result = 
previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
                        if (result.isIncompatible()) {
-                               this.isRegisteredWithIncompatibleSerializer = 
true;
+                               invalidateCurrentSchemaSerializerAccess();
+                       }
+                       if (result.isCompatibleWithReconfiguredSerializer()) {
+                               this.registeredSerializer = 
result.getReconfiguredSerializer();
+                       } else {
+                               this.registeredSerializer = newSerializer;
                        }
-                       this.registeredSerializer = newSerializer;
                        return result;
                }
 
                @Nonnull
-               public final TypeSerializer<T> previousSchemaSerializer() {
-                       if (cachedRestoredSerializer != null) {
-                               return cachedRestoredSerializer;
-                       }
-
-                       this.cachedRestoredSerializer = 
previousSerializerSnapshot.restoreSerializer();
-                       return cachedRestoredSerializer;
+               @Override
+               public TypeSerializerSchemaCompatibility<T> 
setPreviousSerializerSnapshotForRestoredState(
+                               TypeSerializerSnapshot<T> 
previousSerializerSnapshot) {
+                       throw new UnsupportedOperationException("The snapshot 
of the state's previous serializer has already been set; cannot reset.");
                }
        }
 
        /**
-        * Implementation of the {@link StateSerializerProvider} for the new 
state case.
+        * Implementation of the {@link StateSerializerProvider} for the case 
where a new state
+        * serializer instance is registered first, before any snapshots of the 
previous state serializer
+        * is obtained (hence, the naming "eagerly" registered).
         */
-       private static class NewStateSerializerProvider<T> extends 
StateSerializerProvider<T> {
+       private static class EagerlyRegisteredStateSerializerProvider<T> 
extends StateSerializerProvider<T> {
 
-               NewStateSerializerProvider(TypeSerializer<T> 
registeredStateSerializer) {
+               EagerlyRegisteredStateSerializerProvider(TypeSerializer<T> 
registeredStateSerializer) {
                        
super(Preconditions.checkNotNull(registeredStateSerializer));
                }
 
-               @Override
                @Nonnull
-               @SuppressWarnings("ConstantConditions")
-               public TypeSerializer<T> currentSchemaSerializer() {
-                       return registeredSerializer;
-               }
-
                @Override
-               @Nonnull
                public TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
                        throw new UnsupportedOperationException("A serializer 
has already been registered for the state; re-registration is not allowed.");
                }
 
-               @Override
                @Nonnull
-               public TypeSerializer<T> previousSchemaSerializer() {
-                       throw new UnsupportedOperationException("This is a 
NewStateSerializerProvider; you cannot get a restore serializer because there 
was no restored state.");
+               @Override
+               public TypeSerializerSchemaCompatibility<T> 
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T> 
previousSerializerSnapshot) {
+                       checkNotNull(previousSerializerSnapshot);
+                       if (this.previousSerializerSnapshot != null) {
+                               throw new UnsupportedOperationException("The 
snapshot of the state's previous serializer has already been set; cannot 
reset.");
+                       }
+
+                       this.previousSerializerSnapshot = 
previousSerializerSnapshot;
+
+                       TypeSerializerSchemaCompatibility<T> result = 
previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer);
+                       if (result.isIncompatible()) {
+                               invalidateCurrentSchemaSerializerAccess();
+                       }
+                       if (result.isCompatibleWithReconfiguredSerializer()) {
+                               this.registeredSerializer = 
result.getReconfiguredSerializer();
+                       }
+                       return result;
                }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 3f8761b657a..2499898220c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -243,7 +243,7 @@ public HeapKeyedStateBackend(
 
                        TypeSerializerSchemaCompatibility<N> 
namespaceCompatibility =
                                
restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
-                       if (!namespaceCompatibility.isCompatibleAsIs()) {
+                       if (namespaceCompatibility.isCompatibleAfterMigration() 
|| namespaceCompatibility.isIncompatible()) {
                                throw new StateMigrationException("For heap 
backends, the new namespace serializer must be compatible.");
                        }
 
@@ -302,7 +302,7 @@ private boolean hasRegisteredState() {
                }
                StateTable<K, N, SV> stateTable = tryRegisterStateTable(
                        namespaceSerializer, stateDesc, 
getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
-               return stateFactory.createState(stateDesc, stateTable, 
keySerializer);
+               return stateFactory.createState(stateDesc, stateTable, 
getKeySerializer());
        }
 
        @SuppressWarnings("unchecked")
@@ -394,8 +394,9 @@ private void 
restorePartitionedState(Collection<KeyedStateHandle> state) throws
                                if (!keySerializerRestored) {
                                        // check for key serializer 
compatibility; this also reconfigures the
                                        // key serializer to be compatible, if 
it is required and is possible
-                                       if 
(!serializationProxy.getKeySerializerConfigSnapshot()
-                                                       
.resolveSchemaCompatibility(keySerializer).isCompatibleAsIs()) {
+                                       TypeSerializerSchemaCompatibility<K> 
keySerializerSchemaCompat =
+                                               
checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
+                                       if 
(keySerializerSchemaCompat.isCompatibleAfterMigration() || 
keySerializerSchemaCompat.isIncompatible()) {
                                                throw new 
StateMigrationException("The new key serializer must be compatible.");
                                        }
 
@@ -700,7 +701,7 @@ public boolean isAsynchronous() {
                                new KeyedBackendSerializationProxy<>(
                                        // TODO: this code assumes that writing 
a serializer is threadsafe, we should support to
                                        // get a serialized form already at 
state registration time in the future
-                                       keySerializer,
+                                       getKeySerializer(),
                                        metaInfoSnapshots,
                                        
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 55aacb23057..7e497be3555 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -75,7 +75,7 @@ public void testKeyedBackendSerializationProxyRoundtrip() 
throws Exception {
                }
 
                
Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
-               
Assert.assertTrue(serializationProxy.getKeySerializerConfigSnapshot() 
instanceof IntSerializer.IntSerializerSnapshot);
+               Assert.assertTrue(serializationProxy.getKeySerializerSnapshot() 
instanceof IntSerializer.IntSerializerSnapshot);
 
                assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList, 
serializationProxy.getStateMetaInfoSnapshots());
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 5511792673e..8d85e74aca7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -40,6 +40,7 @@
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -69,23 +70,70 @@
        private CheckpointStorageLocation checkpointStorageLocation;
 
        // 
-------------------------------------------------------------------------------
-       //  Keyed state backend migration tests
+       //  Tests for keyed ValueState
        // 
-------------------------------------------------------------------------------
 
        @Test
        public void testKeyedValueStateMigration() throws Exception {
+               final String stateName = "test-name";
+
+               testKeyedValueStateUpgrade(
+                       new ValueStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ValueStateDescriptor<>(
+                               stateName,
+                               // restore with a V2 serializer that has a 
different schema
+                               new TestType.V2TestTypeSerializer()));
+       }
+
+       @Test
+       public void testKeyedValueStateSerializerReconfiguration() throws 
Exception {
+               final String stateName = "test-name";
+
+               testKeyedValueStateUpgrade(
+                       new ValueStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ValueStateDescriptor<>(
+                               stateName,
+                               // the test fails if this serializer is used 
instead of a reconfigured new serializer
+                               new 
TestType.ReconfigurationRequiringTestTypeSerializer()));
+       }
+
+       @Test
+       public void 
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws 
Exception {
+               final String stateName = "test-name";
+
+               try {
+                       testKeyedValueStateUpgrade(
+                               new ValueStateDescriptor<>(
+                                       stateName,
+                                       new TestType.V1TestTypeSerializer()),
+                               new ValueStateDescriptor<>(
+                                       stateName,
+                                       new 
TestType.IncompatibleTestTypeSerializer()));
+
+                       Assert.fail("should have failed");
+               } catch (Exception expected) {
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
StateMigrationException.class).isPresent());
+               }
+       }
+
+       private void testKeyedValueStateUpgrade(
+                       ValueStateDescriptor<TestType> initialAccessDescriptor,
+                       ValueStateDescriptor<TestType> 
newAccessDescriptorAfterRestore) throws Exception {
+
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
 
-               final String stateName = "test-name";
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                try {
-                       ValueStateDescriptor<TestType> kvId = new 
ValueStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ValueState<TestType> valueState = backend
-                               .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+                       ValueState<TestType> valueState = 
backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               CustomVoidNamespaceSerializer.INSTANCE,
+                               initialAccessDescriptor);
 
                        backend.setCurrentKey(1);
                        valueState.update(new TestType("foo", 1456));
@@ -101,16 +149,13 @@ public void testKeyedValueStateMigration() throws 
Exception {
 
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
 
-                       // the new serializer is V2, and has a completely new 
serialization schema.
-                       kvId = new ValueStateDescriptor<>(
-                               stateName,
-                               new TestType.V2TestTypeSerializer());
-                       valueState = backend
-                               .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+                       valueState = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               CustomVoidNamespaceSerializer.INSTANCE,
+                               newAccessDescriptorAfterRestore);
 
                        snapshot.discardState();
 
-                       // the state backend should have decided whether or not 
it needs to perform state migration;
                        // make sure that reading and writing each key state 
works with the new serializer
                        backend.setCurrentKey(1);
                        Assert.assertEquals(new TestType("foo", 1456), 
valueState.value());
@@ -128,20 +173,72 @@ public void testKeyedValueStateMigration() throws 
Exception {
                }
        }
 
+       // 
-------------------------------------------------------------------------------
+       //  Tests for keyed ListState
+       // 
-------------------------------------------------------------------------------
+
        @Test
        public void testKeyedListStateMigration() throws Exception {
+               final String stateName = "test-name";
+
+               testKeyedListStateUpgrade(
+                       new ListStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ListStateDescriptor<>(
+                               stateName,
+                               // restore with a V2 serializer that has a 
different schema
+                               new TestType.V2TestTypeSerializer()));
+       }
+
+       @Test
+       @Ignore("This currently doesn't pass because the ListSerializer doesn't 
respect the reconfigured case, yet.")
+       public void testKeyedListStateSerializerReconfiguration() throws 
Exception {
+               final String stateName = "test-name";
+
+               testKeyedListStateUpgrade(
+                       new ListStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ListStateDescriptor<>(
+                               stateName,
+                               // the test fails if this serializer is used 
instead of a reconfigured new serializer
+                               new 
TestType.ReconfigurationRequiringTestTypeSerializer()));
+       }
+
+       @Test
+       public void 
testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws 
Exception {
+               final String stateName = "test-name";
+
+               try {
+                       testKeyedListStateUpgrade(
+                               new ListStateDescriptor<>(
+                                       stateName,
+                                       new TestType.V1TestTypeSerializer()),
+                               new ListStateDescriptor<>(
+                                       stateName,
+                                       new 
TestType.IncompatibleTestTypeSerializer()));
+
+                       Assert.fail("should have failed");
+               } catch (Exception expected) {
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
StateMigrationException.class).isPresent());
+               }
+       }
+
+       private void testKeyedListStateUpgrade(
+                       ListStateDescriptor<TestType> initialAccessDescriptor,
+                       ListStateDescriptor<TestType> 
newAccessDescriptorAfterRestore) throws Exception {
+
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
 
-               final String stateName = "test-name";
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                try {
-                       ListStateDescriptor<TestType> kvId = new 
ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ListState<TestType> listState = backend
-                               .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+                       ListState<TestType> listState = 
backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               CustomVoidNamespaceSerializer.INSTANCE,
+                               initialAccessDescriptor);
 
                        backend.setCurrentKey(1);
                        listState.add(new TestType("key-1", 1));
@@ -162,16 +259,13 @@ public void testKeyedListStateMigration() throws 
Exception {
 
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
 
-                       // the new serializer is V2, and has a completely new 
serialization schema.
-                       kvId = new ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V2TestTypeSerializer());
-                       listState = backend
-                               .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+                       listState = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               CustomVoidNamespaceSerializer.INSTANCE,
+                               newAccessDescriptorAfterRestore);
 
                        snapshot.discardState();
 
-                       // the state backend should have decided whether or not 
it needs to perform state migration;
                        // make sure that reading and writing each key state 
works with the new serializer
                        backend.setCurrentKey(1);
                        Iterator<TestType> iterable1 = 
listState.get().iterator();
@@ -198,27 +292,24 @@ public void testKeyedListStateMigration() throws 
Exception {
                }
        }
 
+       // 
-------------------------------------------------------------------------------
+       //  Tests for keyed priority queue state
+       // 
-------------------------------------------------------------------------------
+
        @Test
-       public void 
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws 
Exception {
+       public void 
testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws 
Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
 
-               final String stateName = "test-name";
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                try {
-                       ValueStateDescriptor<TestType> kvId = new 
ValueStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ValueState<TestType> valueState = backend
-                               .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+                       InternalPriorityQueue<TestType> internalPriorityQueue = 
backend.create(
+                               "testPriorityQueue", new 
TestType.V1TestTypeSerializer());
 
-                       backend.setCurrentKey(1);
-                       valueState.update(new TestType("foo", 1456));
-                       backend.setCurrentKey(2);
-                       valueState.update(new TestType("bar", 478));
-                       backend.setCurrentKey(3);
-                       valueState.update(new TestType("hello", 189));
+                       internalPriorityQueue.add(new TestType("key-1", 123));
+                       internalPriorityQueue.add(new TestType("key-2", 346));
+                       internalPriorityQueue.add(new TestType("key-1", 777));
 
                        KeyedStateHandle snapshot = runSnapshot(
                                backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
@@ -226,110 +317,64 @@ public void 
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatib
                        backend.dispose();
 
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
-
-                       kvId = new ValueStateDescriptor<>(
-                               stateName,
-                               new TestType.IncompatibleTestTypeSerializer());
-
-                       // the new serializer is INCOMPATIBLE, so registering 
the state should fail
-                       backend.getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+                       backend.create(
+                               "testPriorityQueue", new 
TestType.IncompatibleTestTypeSerializer());
 
                        Assert.fail("should have failed");
                } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-               }finally {
+               } finally {
                        backend.dispose();
                }
        }
 
-       @Test
-       public void 
testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws 
Exception {
-               CheckpointStreamFactory streamFactory = createStreamFactory();
-               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-
-               final String stateName = "test-name";
-               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+       // 
-------------------------------------------------------------------------------
+       //  Tests for key serializer in keyed state backends
+       // 
-------------------------------------------------------------------------------
 
+       @Test
+       public void 
testStateBackendRestoreFailsIfNewKeySerializerRequiresMigration() throws 
Exception {
                try {
-                       ListStateDescriptor<TestType> kvId = new 
ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ListState<TestType> listState = backend
-                               .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
-
-                       backend.setCurrentKey(1);
-                       listState.add(new TestType("key-1", 1));
-                       listState.add(new TestType("key-1", 2));
-                       listState.add(new TestType("key-1", 3));
-
-                       backend.setCurrentKey(2);
-                       listState.add(new TestType("key-2", 1));
-
-                       backend.setCurrentKey(3);
-                       listState.add(new TestType("key-3", 1));
-                       listState.add(new TestType("key-3", 2));
-
-                       KeyedStateHandle snapshot = runSnapshot(
-                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
-                               sharedStateRegistry);
-                       backend.dispose();
-
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
-
-                       kvId = new ListStateDescriptor<>(
-                               stateName,
-                               new TestType.IncompatibleTestTypeSerializer());
-
-                       // the new serializer is INCOMPATIBLE, so registering 
the state should fail
-                       backend.getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+                       testKeySerializerUpgrade(
+                               new TestType.V1TestTypeSerializer(),
+                               new TestType.V2TestTypeSerializer());
 
                        Assert.fail("should have failed");
-               } catch (Exception e) {
-                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-               } finally {
-                       backend.dispose();
+               } catch (Exception expected) {
+                       // the new key serializer requires migration; this 
should fail the restore
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
StateMigrationException.class).isPresent());
                }
        }
 
        @Test
-       public void 
testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws 
Exception {
-               CheckpointStreamFactory streamFactory = createStreamFactory();
-               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-
-               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+       public void 
testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration() 
throws Exception {
+               testKeySerializerUpgrade(
+                       new TestType.V1TestTypeSerializer(),
+                       new 
TestType.ReconfigurationRequiringTestTypeSerializer());
+       }
 
+       @Test
+       public void 
testStateBackendRestoreFailsIfNewKeySerializerIsIncompatible() throws Exception 
{
                try {
-                       InternalPriorityQueue<TestType> internalPriorityQueue = 
backend.create(
-                               "testPriorityQueue", new 
TestType.V1TestTypeSerializer());
-
-                       internalPriorityQueue.add(new TestType("key-1", 123));
-                       internalPriorityQueue.add(new TestType("key-2", 346));
-                       internalPriorityQueue.add(new TestType("key-1", 777));
-
-                       KeyedStateHandle snapshot = runSnapshot(
-                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
-                               sharedStateRegistry);
-                       backend.dispose();
-
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
-                       backend.create(
-                               "testPriorityQueue", new 
TestType.IncompatibleTestTypeSerializer());
+                       testKeySerializerUpgrade(
+                               new TestType.V1TestTypeSerializer(),
+                               new TestType.IncompatibleTestTypeSerializer());
 
                        Assert.fail("should have failed");
-               } catch (Exception e) {
-                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-               } finally {
-                       backend.dispose();
+               } catch (Exception expected) {
+                       // the new key serializer is incompatible; this should 
fail the restore
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
StateMigrationException.class).isPresent());
                }
        }
 
-       @Test
-       public void 
testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() throws 
Exception {
+       private void testKeySerializerUpgrade(
+                       TypeSerializer<TestType> initialKeySerializer,
+                       TypeSerializer<TestType> newKeySerializer) throws 
Exception {
+
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
 
-               AbstractKeyedStateBackend<TestType> backend = 
createKeyedBackend(
-                       new TestType.V1TestTypeSerializer());
+               AbstractKeyedStateBackend<TestType> backend = 
createKeyedBackend(initialKeySerializer);
 
                final String stateName = "test-name";
                try {
@@ -347,30 +392,66 @@ public void 
testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() thr
                                sharedStateRegistry);
                        backend.dispose();
 
-                       try {
-                               // the new key serializer is incompatible; this 
should fail the restore
-                               restoreKeyedBackend(new 
TestType.IncompatibleTestTypeSerializer(), snapshot);
+                       backend = restoreKeyedBackend(newKeySerializer, 
snapshot);
 
-                               Assert.fail("should have failed");
-                       } catch (Exception e) {
-                               
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-                       }
+                       valueState = backend
+                               .getPartitionedState(VoidNamespace.INSTANCE, 
CustomVoidNamespaceSerializer.INSTANCE, kvId);
 
-                       try {
-                               // the new key serializer requires migration; 
this should fail the restore
-                               restoreKeyedBackend(new 
TestType.V2TestTypeSerializer(), snapshot);
+                       // access and check previous state
+                       backend.setCurrentKey(new TestType("foo", 123));
+                       Assert.assertEquals(1, valueState.value().intValue());
+                       backend.setCurrentKey(new TestType("bar", 456));
+                       Assert.assertEquals(5, valueState.value().intValue());
 
-                               Assert.fail("should have failed");
-                       } catch (Exception e) {
-                               
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-                       }
+                       snapshot.discardState();
                } finally {
                        backend.dispose();
                }
        }
 
+       // 
-------------------------------------------------------------------------------
+       //  Tests for namespace serializer in keyed state backends
+       // 
-------------------------------------------------------------------------------
+
        @Test
-       public void 
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatible() throws 
Exception {
+       public void 
testKeyedStateRegistrationFailsIfNewNamespaceSerializerRequiresMigration() 
throws Exception {
+               try {
+                       testNamespaceSerializerUpgrade(
+                               new TestType.V1TestTypeSerializer(),
+                               new TestType.V2TestTypeSerializer());
+
+                       Assert.fail("should have failed");
+               } catch (Exception expected) {
+                       // the new namespace serializer requires migration; 
this should fail the restore
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
StateMigrationException.class).isPresent());
+               }
+       }
+
+       @Test
+       public void 
testKeyedStateRegistrationSucceedsIfNewNamespaceSerializerRequiresReconfiguration()
 throws Exception {
+               testNamespaceSerializerUpgrade(
+                       new TestType.V1TestTypeSerializer(),
+                       new 
TestType.ReconfigurationRequiringTestTypeSerializer());
+       }
+
+       @Test
+       public void 
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsIncompatible() throws 
Exception {
+               try {
+                       testNamespaceSerializerUpgrade(
+                               new TestType.V1TestTypeSerializer(),
+                               new TestType.IncompatibleTestTypeSerializer());
+
+                       Assert.fail("should have failed");
+               } catch (Exception expected) {
+                       // the new namespace serializer is incompatible; this 
should fail the restore
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
StateMigrationException.class).isPresent());
+               }
+       }
+
+       private void testNamespaceSerializerUpgrade(
+                       TypeSerializer<TestType> initialNamespaceSerializer,
+                       TypeSerializer<TestType> 
newNamespaceSerializerAfterRestore) throws Exception {
+
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
 
@@ -382,7 +463,7 @@ public void 
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatib
                        ValueState<Integer> valueState = backend
                                .getPartitionedState(
                                        new TestType("namespace", 123),
-                                       new TestType.V1TestTypeSerializer(),
+                                       initialNamespaceSerializer,
                                        kvId);
 
                        backend.setCurrentKey(1);
@@ -397,53 +478,87 @@ public void 
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatib
                        // test incompatible namespace serializer; start with a 
freshly restored backend
                        backend.dispose();
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
-                       try {
-                               // the new namespace serializer is 
incompatible; this should fail the restore
-                               backend.getPartitionedState(
-                                       new TestType("namespace", 123),
-                                       new 
TestType.IncompatibleTestTypeSerializer(),
-                                       kvId);
 
-                               Assert.fail("should have failed");
-                       } catch (Exception e) {
-                               
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-                       }
+                       valueState = backend.getPartitionedState(
+                               new TestType("namespace", 123),
+                               newNamespaceSerializerAfterRestore,
+                               kvId);
 
-                       // test namespace serializer that requires migration; 
start with a freshly restored backend
-                       backend.dispose();
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
-                       try {
-                               // the new namespace serializer requires 
migration; this should fail the restore
-                               backend.getPartitionedState(
-                                       new TestType("namespace", 123),
-                                       new TestType.V2TestTypeSerializer(),
-                                       kvId);
+                       // access and check previous state
+                       backend.setCurrentKey(1);
+                       Assert.assertEquals(10, valueState.value().intValue());
+                       valueState.update(10);
+                       backend.setCurrentKey(5);
+                       Assert.assertEquals(50, valueState.value().intValue());
 
-                               Assert.fail("should have failed");
-                       } catch (Exception e) {
-                               
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-                       }
+                       snapshot.discardState();
                } finally {
                        backend.dispose();
                }
        }
 
        // 
-------------------------------------------------------------------------------
-       //  Operator state backend migration tests
+       //  Operator state backend partitionable list state tests
        // 
-------------------------------------------------------------------------------
 
        @Test
        public void testOperatorParitionableListStateMigration() throws 
Exception {
+               final String stateName = "partitionable-list-state";
+
+               testOperatorPartitionableListStateUpgrade(
+                       new ListStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ListStateDescriptor<>(
+                               stateName,
+                               // restore with a V2 serializer that has a 
different schema
+                               new TestType.V2TestTypeSerializer()));
+       }
+
+       @Test
+       public void 
testOperatorParitionableListStateSerializerReconfiguration() throws Exception {
+               final String stateName = "partitionable-list-state";
+
+               testOperatorPartitionableListStateUpgrade(
+                       new ListStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ListStateDescriptor<>(
+                               stateName,
+                               // restore with a new serializer that requires 
reconfiguration
+                               new 
TestType.ReconfigurationRequiringTestTypeSerializer()));
+       }
+
+       @Test
+       public void 
testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible()
 throws Exception {
+               final String stateName = "partitionable-list-state";
+
+               try {
+                       testOperatorPartitionableListStateUpgrade(
+                               new ListStateDescriptor<>(
+                                       stateName,
+                                       new TestType.V1TestTypeSerializer()),
+                               new ListStateDescriptor<>(
+                                       stateName,
+                                       // restore with a new incompatible 
serializer
+                                       new 
TestType.IncompatibleTestTypeSerializer()));
+
+                       Assert.fail("should have failed.");
+               } catch (Exception e) {
+                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
+               }
+       }
+
+       private void testOperatorPartitionableListStateUpgrade(
+                       ListStateDescriptor<TestType> initialAccessDescriptor,
+                       ListStateDescriptor<TestType> 
newAccessDescriptorAfterRestore) throws Exception {
+
                CheckpointStreamFactory streamFactory = createStreamFactory();
 
                OperatorStateBackend backend = createOperatorStateBackend();
 
-               final String stateName = "partitionable-list-state";
                try {
-                       ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ListState<TestType> state = 
backend.getListState(descriptor);
+                       ListState<TestType> state = 
backend.getListState(initialAccessDescriptor);
 
                        state.add(new TestType("foo", 13));
                        state.add(new TestType("bar", 278));
@@ -454,12 +569,8 @@ public void testOperatorParitionableListStateMigration() 
throws Exception {
 
                        backend = restoreOperatorStateBackend(snapshot);
 
-                       descriptor = new ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V2TestTypeSerializer());
-                       state = backend.getListState(descriptor);
+                       state = 
backend.getListState(newAccessDescriptorAfterRestore);
 
-                       // the state backend should have decided whether or not 
it needs to perform state migration;
                        // make sure that reading and writing each state 
partition works with the new serializer
                        Iterator<TestType> iterator = state.get().iterator();
                        Assert.assertEquals(new TestType("foo", 13), 
iterator.next());
@@ -471,18 +582,69 @@ public void testOperatorParitionableListStateMigration() 
throws Exception {
                }
        }
 
+       // 
-------------------------------------------------------------------------------
+       //  Operator state backend union list state tests
+       // 
-------------------------------------------------------------------------------
+
+       @Test
+       public void testOperatorUnionListStateMigration() throws Exception {
+               final String stateName = "union-list-state";
+
+               testOperatorUnionListStateUpgrade(
+                       new ListStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ListStateDescriptor<>(
+                               stateName,
+                               // restore with a V2 serializer that has a 
different schema
+                               new TestType.V2TestTypeSerializer()));
+       }
+
        @Test
-       public void testUnionListStateMigration() throws Exception {
+       public void testOperatorUnionListStateSerializerReconfiguration() 
throws Exception {
+               final String stateName = "union-list-state";
+
+               testOperatorUnionListStateUpgrade(
+                       new ListStateDescriptor<>(
+                               stateName,
+                               new TestType.V1TestTypeSerializer()),
+                       new ListStateDescriptor<>(
+                               stateName,
+                               // restore with a new serializer that requires 
reconfiguration
+                               new 
TestType.ReconfigurationRequiringTestTypeSerializer()));
+       }
+
+
+       @Test
+       public void 
testOperatorUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() 
throws Exception {
+               final String stateName = "union-list-state";
+
+               try {
+                       testOperatorUnionListStateUpgrade(
+                               new ListStateDescriptor<>(
+                                       stateName,
+                                       new TestType.V1TestTypeSerializer()),
+                               new ListStateDescriptor<>(
+                                       stateName,
+                                       // restore with a new incompatible 
serializer
+                                       new 
TestType.IncompatibleTestTypeSerializer()));
+
+                       Assert.fail("should have failed.");
+               } catch (Exception e) {
+                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
+               }
+       }
+
+       private void testOperatorUnionListStateUpgrade(
+                       ListStateDescriptor<TestType> initialAccessDescriptor,
+                       ListStateDescriptor<TestType> 
newAccessDescriptorAfterRestore) throws Exception {
+
                CheckpointStreamFactory streamFactory = createStreamFactory();
 
                OperatorStateBackend backend = createOperatorStateBackend();
 
-               final String stateName = "union-list-state";
                try {
-                       ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ListState<TestType> state = 
backend.getUnionListState(descriptor);
+                       ListState<TestType> state = 
backend.getUnionListState(initialAccessDescriptor);
 
                        state.add(new TestType("foo", 13));
                        state.add(new TestType("bar", 278));
@@ -493,10 +655,7 @@ public void testUnionListStateMigration() throws Exception 
{
 
                        backend = restoreOperatorStateBackend(snapshot);
 
-                       descriptor = new ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V2TestTypeSerializer());
-                       state = backend.getUnionListState(descriptor);
+                       state = 
backend.getUnionListState(newAccessDescriptorAfterRestore);
 
                        // the state backend should have decided whether or not 
it needs to perform state migration;
                        // make sure that reading and writing each state 
partition works with the new serializer
@@ -510,171 +669,128 @@ public void testUnionListStateMigration() throws 
Exception {
                }
        }
 
+       // 
-------------------------------------------------------------------------------
+       //  Operator state backend broadcast state tests
+       // 
-------------------------------------------------------------------------------
+
        @Test
        public void testBroadcastStateValueMigration() throws Exception {
-               CheckpointStreamFactory streamFactory = createStreamFactory();
-
-               OperatorStateBackend backend = createOperatorStateBackend();
-
                final String stateName = "broadcast-state";
-               try {
-                       MapStateDescriptor<Integer, TestType> descriptor = new 
MapStateDescriptor<>(
+
+               testBroadcastStateValueUpgrade(
+                       new MapStateDescriptor<>(
                                stateName,
                                IntSerializer.INSTANCE,
-                               new TestType.V1TestTypeSerializer());
-                       BroadcastState<Integer, TestType> state = 
backend.getBroadcastState(descriptor);
-
-                       state.put(3, new TestType("foo", 13));
-                       state.put(5, new TestType("bar", 278));
-
-                       OperatorStateHandle snapshot = runSnapshot(
-                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
-                       backend.dispose();
-
-                       backend = restoreOperatorStateBackend(snapshot);
-
-                       descriptor = new MapStateDescriptor<>(
+                               new TestType.V1TestTypeSerializer()),
+                       new MapStateDescriptor<>(
                                stateName,
                                IntSerializer.INSTANCE,
-                               new TestType.V2TestTypeSerializer());
-                       state = backend.getBroadcastState(descriptor);
-
-                       // the state backend should have decided whether or not 
it needs to perform state migration;
-                       // make sure that reading and writing each broadcast 
entry works with the new serializer
-                       Assert.assertEquals(new TestType("foo", 13), 
state.get(3));
-                       Assert.assertEquals(new TestType("bar", 278), 
state.get(5));
-                       state.put(17, new TestType("new-entry", 777));
-               } finally {
-                       backend.dispose();
-               }
+                               // new value serializer is a V2 serializer with 
a different schema
+                               new TestType.V2TestTypeSerializer()));
        }
 
+
        @Test
        public void testBroadcastStateKeyMigration() throws Exception {
-               CheckpointStreamFactory streamFactory = createStreamFactory();
-
-               OperatorStateBackend backend = createOperatorStateBackend();
-
                final String stateName = "broadcast-state";
-               try {
-                       MapStateDescriptor<TestType, Integer> descriptor = new 
MapStateDescriptor<>(
+
+               testBroadcastStateKeyUpgrade(
+                       new MapStateDescriptor<>(
                                stateName,
                                new TestType.V1TestTypeSerializer(),
-                               IntSerializer.INSTANCE);
-                       BroadcastState<TestType, Integer> state = 
backend.getBroadcastState(descriptor);
-
-                       state.put(new TestType("foo", 13), 3);
-                       state.put(new TestType("bar", 278), 5);
-
-                       OperatorStateHandle snapshot = runSnapshot(
-                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
-                       backend.dispose();
-
-                       backend = restoreOperatorStateBackend(snapshot);
-
-                       descriptor = new MapStateDescriptor<>(
+                               IntSerializer.INSTANCE),
+                       new MapStateDescriptor<>(
                                stateName,
+                               // new key serializer is a V2 serializer with a 
different schema
                                new TestType.V2TestTypeSerializer(),
-                               IntSerializer.INSTANCE);
-                       state = backend.getBroadcastState(descriptor);
-
-                       // the state backend should have decided whether or not 
it needs to perform state migration;
-                       // make sure that reading and writing each broadcast 
entry works with the new serializer
-                       Assert.assertEquals((Integer) 3, state.get(new 
TestType("foo", 13)));
-                       Assert.assertEquals((Integer) 5, state.get(new 
TestType("bar", 278)));
-                       state.put(new TestType("new-entry", 777), 17);
-               } finally {
-                       backend.dispose();
-               }
+                               IntSerializer.INSTANCE));
        }
 
        @Test
-       public void 
testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible()
 throws Exception {
-               CheckpointStreamFactory streamFactory = createStreamFactory();
-
-               OperatorStateBackend backend = createOperatorStateBackend();
+       public void testBroadcastStateValueSerializerReconfiguration() throws 
Exception {
+               final String stateName = "broadcast-state";
 
-               final String stateName = "partitionable-list-state";
-               try {
-                       ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
+               testBroadcastStateValueUpgrade(
+                       new MapStateDescriptor<>(
                                stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ListState<TestType> state = 
backend.getListState(descriptor);
-
-                       state.add(new TestType("foo", 13));
-                       state.add(new TestType("bar", 278));
-
-                       OperatorStateHandle snapshot = runSnapshot(
-                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
-                       backend.dispose();
+                               IntSerializer.INSTANCE,
+                               new TestType.V1TestTypeSerializer()),
+                       new MapStateDescriptor<>(
+                               stateName,
+                               IntSerializer.INSTANCE,
+                               // new value serializer is a new serializer 
that requires reconfiguration
+                               new 
TestType.ReconfigurationRequiringTestTypeSerializer()));
+       }
 
-                       backend = restoreOperatorStateBackend(snapshot);
+       @Test
+       public void testBroadcastStateKeySerializerReconfiguration() throws 
Exception {
+               final String stateName = "broadcast-state";
 
-                       descriptor = new ListStateDescriptor<>(
+               testBroadcastStateKeyUpgrade(
+                       new MapStateDescriptor<>(
                                stateName,
-                               new TestType.IncompatibleTestTypeSerializer());
+                               new TestType.V1TestTypeSerializer(),
+                               IntSerializer.INSTANCE),
+                       new MapStateDescriptor<>(
+                               stateName,
+                               // new key serializer is a new serializer that 
requires reconfiguration
+                               new 
TestType.ReconfigurationRequiringTestTypeSerializer(),
+                               IntSerializer.INSTANCE));
+       }
 
-                       // the new serializer is INCOMPATIBLE, so registering 
the state should fail
-                       backend.getListState(descriptor);
+       @Test
+       public void 
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() throws 
Exception {
+               final String stateName = "broadcast-state";
+
+               try {
+                       testBroadcastStateValueUpgrade(
+                               new MapStateDescriptor<>(
+                                       stateName,
+                                       IntSerializer.INSTANCE,
+                                       new TestType.V1TestTypeSerializer()),
+                               new MapStateDescriptor<>(
+                                       stateName,
+                                       IntSerializer.INSTANCE,
+                                       // new value serializer is incompatible
+                                       new 
TestType.IncompatibleTestTypeSerializer()));
 
                        Assert.fail("should have failed.");
                } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-               } finally {
-                       backend.dispose();
                }
        }
 
        @Test
-       public void 
testUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() throws 
Exception {
-               CheckpointStreamFactory streamFactory = createStreamFactory();
-
-               OperatorStateBackend backend = createOperatorStateBackend();
+       public void 
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() throws 
Exception {
+               final String stateName = "broadcast-state";
 
-               final String stateName = "union-list-state";
                try {
-                       ListStateDescriptor<TestType> descriptor = new 
ListStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer());
-                       ListState<TestType> state = 
backend.getUnionListState(descriptor);
-
-                       state.add(new TestType("foo", 13));
-                       state.add(new TestType("bar", 278));
-
-                       OperatorStateHandle snapshot = runSnapshot(
-                               backend.snapshot(1L, 2L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
-                       backend.dispose();
-
-                       backend = restoreOperatorStateBackend(snapshot);
-
-                       descriptor = new ListStateDescriptor<>(
-                               stateName,
-                               new TestType.IncompatibleTestTypeSerializer());
-
-                       // the new serializer is INCOMPATIBLE, so registering 
the state should fail
-                       backend.getUnionListState(descriptor);
+                       testBroadcastStateKeyUpgrade(
+                               new MapStateDescriptor<>(
+                                       stateName,
+                                       new TestType.V1TestTypeSerializer(),
+                                       IntSerializer.INSTANCE),
+                               new MapStateDescriptor<>(
+                                       stateName,
+                                       // new key serializer is incompatible
+                                       new 
TestType.IncompatibleTestTypeSerializer(),
+                                       IntSerializer.INSTANCE));
 
                        Assert.fail("should have failed.");
                } catch (Exception e) {
                        Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
-               } finally {
-                       backend.dispose();
                }
        }
 
-       @Test
-       public void 
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() throws 
Exception {
+       private void testBroadcastStateValueUpgrade(
+                       MapStateDescriptor<Integer, TestType> 
initialAccessDescriptor,
+                       MapStateDescriptor<Integer, TestType> 
newAccessDescriptorAfterRestore) throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
 
                OperatorStateBackend backend = createOperatorStateBackend();
 
-               final String stateName = "broadcast-state";
                try {
-                       MapStateDescriptor<Integer, TestType> descriptor = new 
MapStateDescriptor<>(
-                               stateName,
-                               IntSerializer.INSTANCE,
-                               new TestType.V1TestTypeSerializer());
-                       BroadcastState<Integer, TestType> state = 
backend.getBroadcastState(descriptor);
+                       BroadcastState<Integer, TestType> state = 
backend.getBroadcastState(initialAccessDescriptor);
 
                        state.put(3, new TestType("foo", 13));
                        state.put(5, new TestType("bar", 278));
@@ -685,35 +801,28 @@ public void 
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatibl
 
                        backend = restoreOperatorStateBackend(snapshot);
 
-                       descriptor = new MapStateDescriptor<>(
-                               stateName,
-                               IntSerializer.INSTANCE,
-                               new TestType.IncompatibleTestTypeSerializer());
+                       state = 
backend.getBroadcastState(newAccessDescriptorAfterRestore);
 
-                       // the new value serializer is INCOMPATIBLE, so 
registering the state should fail
-                       backend.getBroadcastState(descriptor);
-
-                       Assert.fail("should have failed.");
-               } catch (Exception e) {
-                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
+                       // the state backend should have decided whether or not 
it needs to perform state migration;
+                       // make sure that reading and writing each broadcast 
entry works with the new serializer
+                       Assert.assertEquals(new TestType("foo", 13), 
state.get(3));
+                       Assert.assertEquals(new TestType("bar", 278), 
state.get(5));
+                       state.put(17, new TestType("new-entry", 777));
                } finally {
                        backend.dispose();
                }
        }
 
-       @Test
-       public void 
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() throws 
Exception {
+       private void testBroadcastStateKeyUpgrade(
+                       MapStateDescriptor<TestType, Integer> 
initialAccessDescriptor,
+                       MapStateDescriptor<TestType, Integer> 
newAccessDescriptorAfterRestore) throws Exception {
+
                CheckpointStreamFactory streamFactory = createStreamFactory();
 
                OperatorStateBackend backend = createOperatorStateBackend();
 
-               final String stateName = "broadcast-state";
                try {
-                       MapStateDescriptor<TestType, Integer> descriptor = new 
MapStateDescriptor<>(
-                               stateName,
-                               new TestType.V1TestTypeSerializer(),
-                               IntSerializer.INSTANCE);
-                       BroadcastState<TestType, Integer> state = 
backend.getBroadcastState(descriptor);
+                       BroadcastState<TestType, Integer> state = 
backend.getBroadcastState(initialAccessDescriptor);
 
                        state.put(new TestType("foo", 13), 3);
                        state.put(new TestType("bar", 278), 5);
@@ -724,17 +833,13 @@ public void 
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible(
 
                        backend = restoreOperatorStateBackend(snapshot);
 
-                       descriptor = new MapStateDescriptor<>(
-                               stateName,
-                               new TestType.IncompatibleTestTypeSerializer(),
-                               IntSerializer.INSTANCE);
-
-                       // the new key serializer is INCOMPATIBLE, so 
registering the state should fail
-                       backend.getBroadcastState(descriptor);
+                       state = 
backend.getBroadcastState(newAccessDescriptorAfterRestore);
 
-                       Assert.fail("should have failed.");
-               } catch (Exception e) {
-                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
StateMigrationException.class).isPresent());
+                       // the state backend should have decided whether or not 
it needs to perform state migration;
+                       // make sure that reading and writing each broadcast 
entry works with the new serializer
+                       Assert.assertEquals((Integer) 3, state.get(new 
TestType("foo", 13)));
+                       Assert.assertEquals((Integer) 5, state.get(new 
TestType("bar", 278)));
+                       state.put(new TestType("new-entry", 777), 17);
                } finally {
                        backend.dispose();
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
index de1f2bd962c..91ebc958ae6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
@@ -42,15 +42,15 @@
        // 
--------------------------------------------------------------------------------
 
        @Test
-       public void testCurrentSchemaSerializerForNewStateSerializerProvider() {
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+       public void 
testCurrentSchemaSerializerForEagerlyRegisteredStateSerializerProvider() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(new 
TestType.V1TestTypeSerializer());
                assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
        }
 
        @Test
-       public void 
testCurrentSchemaSerializerForRestoredStateSerializerProvider() {
+       public void 
testCurrentSchemaSerializerForLazilyRegisteredStateSerializerProvider() {
                TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
                assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
        }
 
@@ -59,17 +59,17 @@ public void 
testCurrentSchemaSerializerForRestoredStateSerializerProvider() {
        // 
--------------------------------------------------------------------------------
 
        @Test(expected = UnsupportedOperationException.class)
-       public void testPreviousSchemaSerializerForNewStateSerializerProvider() 
{
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+       public void 
testPreviousSchemaSerializerForEagerlyRegisteredStateSerializerProvider() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(new 
TestType.V1TestTypeSerializer());
 
                // this should fail with an exception
                testProvider.previousSchemaSerializer();
        }
 
        @Test
-       public void 
testPreviousSchemaSerializerForRestoredStateSerializerProvider() {
+       public void 
testPreviousSchemaSerializerForLazilyRegisteredStateSerializerProvider() {
                TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
                assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
        }
 
@@ -78,7 +78,7 @@ public void testLazyInstantiationOfPreviousSchemaSerializer() 
{
                // create the provider with an exception throwing snapshot;
                // this would throw an exception if the restore serializer was 
eagerly accessed
                StateSerializerProvider<String> testProvider =
-                       StateSerializerProvider.fromRestoredState(new 
ExceptionThrowingSerializerSnapshot());
+                       
StateSerializerProvider.fromPreviousSerializerSnapshot(new 
ExceptionThrowingSerializerSnapshot());
 
                try {
                        // if we fail here, that means the restore serializer 
was indeed lazily accessed
@@ -94,15 +94,15 @@ public void 
testLazyInstantiationOfPreviousSchemaSerializer() {
        // 
--------------------------------------------------------------------------------
 
        @Test(expected = UnsupportedOperationException.class)
-       public void 
testRegisterNewSerializerWithNewStateSerializerProviderShouldFail() {
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+       public void 
testRegisterNewSerializerWithEagerlyRegisteredStateSerializerProviderShouldFail()
 {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(new 
TestType.V1TestTypeSerializer());
                testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
        }
 
        @Test(expected = UnsupportedOperationException.class)
-       public void 
testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFail() {
+       public void 
testRegisterNewSerializerTwiceWithLazilyRegisteredStateSerializerProviderShouldFail()
 {
                TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
                testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
 
@@ -111,9 +111,9 @@ public void 
testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFa
        }
 
        @Test
-       public void testRegisterNewCompatibleAsIsSerializer() {
+       public void testLazilyRegisterNewCompatibleAsIsSerializer() {
                TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
                // register compatible serializer for state
                TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
@@ -125,20 +125,36 @@ public void testRegisterNewCompatibleAsIsSerializer() {
        }
 
        @Test
-       public void testRegisterNewCompatibleAfterMigrationSerializer() {
+       public void testLazilyRegisterNewCompatibleAfterMigrationSerializer() {
                TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
                // register serializer that requires migration for state
                TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
                        testProvider.registerNewSerializerForRestoredState(new 
TestType.V2TestTypeSerializer());
                assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V2TestTypeSerializer);
+               assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void testLazilyRegisterNewSerializerRequiringReconfiguration() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
+
+               // register serializer that requires reconfiguration, and 
verify that
+               // the resulting current schema serializer is the reconfigured 
one
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       testProvider.registerNewSerializerForRestoredState(new 
TestType.ReconfigurationRequiringTestTypeSerializer());
+               
assertTrue(schemaCompatibility.isCompatibleWithReconfiguredSerializer());
+               assertTrue(testProvider.currentSchemaSerializer().getClass() == 
TestType.V1TestTypeSerializer.class);
        }
 
        @Test
-       public void testRegisterIncompatibleSerializer() {
+       public void testLazilyRegisterIncompatibleSerializer() {
                TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
-               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
 
                // register serializer that requires migration for state
                TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
@@ -148,6 +164,88 @@ public void testRegisterIncompatibleSerializer() {
                try {
                        // a serializer for the current schema will no longer 
be accessible
                        testProvider.currentSchemaSerializer();
+
+                       fail();
+               } catch (Exception excepted) {
+                       // success
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------
+       //  Tests for 
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
+       // 
--------------------------------------------------------------------------------
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void 
testSetSerializerSnapshotWithLazilyRegisteredSerializerProviderShouldFail() {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
+
+               
testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void 
testSetSerializerSnapshotTwiceWithEagerlyRegisteredSerializerProviderShouldFail()
 {
+               TestType.V1TestTypeSerializer serializer = new 
TestType.V1TestTypeSerializer();
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(serializer);
+
+               
testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+
+               // second registration should fail
+               
testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+       }
+
+       @Test
+       public void testEagerlyRegisterNewCompatibleAsIsSerializer() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(new 
TestType.V1TestTypeSerializer());
+
+               // set previous serializer snapshot for state, which should let 
the new serializer be considered compatible as is
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       
testProvider.setPreviousSerializerSnapshotForRestoredState(new 
TestType.V1TestTypeSerializer().snapshotConfiguration());
+               assertTrue(schemaCompatibility.isCompatibleAsIs());
+
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+               assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void testEagerlyRegisterCompatibleAfterMigrationSerializer() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(new 
TestType.V2TestTypeSerializer());
+
+               // set previous serializer snapshot for state, which should let 
the new serializer be considered compatible after migration
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       
testProvider.setPreviousSerializerSnapshotForRestoredState(new 
TestType.V1TestTypeSerializer().snapshotConfiguration());
+               assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+
+               assertTrue(testProvider.currentSchemaSerializer() instanceof 
TestType.V2TestTypeSerializer);
+               assertTrue(testProvider.previousSchemaSerializer() instanceof 
TestType.V1TestTypeSerializer);
+       }
+
+       @Test
+       public void testEagerlyRegisterNewSerializerRequiringReconfiguration() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(new 
TestType.ReconfigurationRequiringTestTypeSerializer());
+
+               // set previous serializer snapshot, which should let the new 
serializer be considered to require reconfiguration,
+               // and verify that the resulting current schema serializer is 
the reconfigured one
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       
testProvider.setPreviousSerializerSnapshotForRestoredState(new 
TestType.V1TestTypeSerializer().snapshotConfiguration());
+               
assertTrue(schemaCompatibility.isCompatibleWithReconfiguredSerializer());
+               assertTrue(testProvider.currentSchemaSerializer().getClass() == 
TestType.V1TestTypeSerializer.class);
+       }
+
+       @Test
+       public void testEagerlyRegisterIncompatibleSerializer() {
+               StateSerializerProvider<TestType> testProvider = 
StateSerializerProvider.fromNewRegisteredSerializer(new 
TestType.IncompatibleTestTypeSerializer());
+
+               // set previous serializer snapshot for state, which should let 
the new serializer be considered incompatible
+               TypeSerializerSchemaCompatibility<TestType> schemaCompatibility 
=
+                       
testProvider.setPreviousSerializerSnapshotForRestoredState(new 
TestType.V1TestTypeSerializer().snapshotConfiguration());
+               assertTrue(schemaCompatibility.isIncompatible());
+
+               try {
+                       // a serializer for the current schema will no longer 
be accessible
+                       testProvider.currentSchemaSerializer();
+
+                       fail();
                } catch (Exception excepted) {
                        // success
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
index e3b0a066e51..52d5521aae8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
@@ -148,6 +148,29 @@ public TestType deserialize(DataInputView source) throws 
IOException {
                }
        }
 
+       /**
+        * A serializer that is meant to be compatible with any of the 
serializers only ofter being reconfigured as a new instance.
+        */
+       public static class ReconfigurationRequiringTestTypeSerializer extends 
TestTypeSerializerBase {
+
+               private static final long serialVersionUID = 
-7254527815207212324L;
+
+               @Override
+               public void serialize(TestType record, DataOutputView target) 
throws IOException {
+                       throw new UnsupportedOperationException("The serializer 
should have been reconfigured as a new instance; shouldn't be used.");
+               }
+
+               @Override
+               public TestType deserialize(DataInputView source) throws 
IOException {
+                       throw new UnsupportedOperationException("The serializer 
should have been reconfigured as a new instance; shouldn't be used.");
+               }
+
+               @Override
+               public TypeSerializerSnapshot<TestType> snapshotConfiguration() 
{
+                       throw new UnsupportedOperationException("The serializer 
should have been reconfigured as a new instance; shouldn't be used.");
+               }
+       }
+
        /**
         * A serializer that is meant to be incompatible with any of the 
serializers.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
index b2b802a30c2..9f78ce32fdd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
@@ -42,8 +42,13 @@ public int getCurrentVersion() {
                        return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
                } else if (newSerializer instanceof 
TestType.V2TestTypeSerializer) {
                        return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
-               } else {
+               } else if (newSerializer instanceof 
TestType.ReconfigurationRequiringTestTypeSerializer) {
+                       // we mimic the reconfiguration by just 
re-instantiating the correct serializer
+                       return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new 
TestType.V1TestTypeSerializer());
+               } else if (newSerializer instanceof 
TestType.IncompatibleTestTypeSerializer) {
                        return TypeSerializerSchemaCompatibility.incompatible();
+               } else {
+                       throw new IllegalStateException("Unknown serializer 
class for TestType.");
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
index 3cd4fff8d86..983a2ae88c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
@@ -40,8 +40,17 @@ public int getCurrentVersion() {
        public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(TypeSerializer<TestType> newSerializer) {
                if (newSerializer instanceof TestType.V2TestTypeSerializer) {
                        return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
-               } else {
+               } else if (newSerializer instanceof 
TestType.ReconfigurationRequiringTestTypeSerializer) {
+                       // we mimic the reconfiguration by just 
re-instantiating the correct serializer
+                       return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new 
TestType.V2TestTypeSerializer());
+               } else if (
+                       // migrating from V2 -> V1 is not supported
+                       newSerializer instanceof TestType.V1TestTypeSerializer
+                               || newSerializer instanceof 
TestType.IncompatibleTestTypeSerializer) {
+
                        return TypeSerializerSchemaCompatibility.incompatible();
+               } else {
+                       throw new IllegalStateException("Unknown serializer 
class for TestType.");
                }
        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 700c5468c8c..5f914ff4645 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -336,7 +336,7 @@ private static void checkAndCreateDirectory(File directory) 
throws IOException {
 
                final TypeSerializer<N> namespaceSerializer = 
registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
                final DataOutputSerializer namespaceOutputView = new 
DataOutputSerializer(8);
-               boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
+               boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), 
namespaceSerializer);
                final byte[] nameSpaceBytes;
                try {
                        RocksDBKeySerializationUtils.writeNameSpace(
@@ -352,7 +352,7 @@ private static void checkAndCreateDirectory(File directory) 
throws IOException {
                RocksIteratorWrapper iterator = getRocksIterator(db, 
columnInfo.f0);
                iterator.seekToFirst();
 
-               final RocksStateKeysIterator<K> iteratorWrapper = new 
RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+               final RocksStateKeysIterator<K> iteratorWrapper = new 
RocksStateKeysIterator<>(iterator, state, getKeySerializer(), 
keyGroupPrefixBytes,
                        ambiguousKeyPossible, nameSpaceBytes);
 
                Stream<K> targetStream = 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 
Spliterator.ORDERED), false);
@@ -536,7 +536,7 @@ void initializeSnapshotStrategy(
                        new RocksFullSnapshotStrategy<>(
                                db,
                                rocksDBResourceGuard,
-                               keySerializer,
+                               getKeySerializer(),
                                kvStateInformation,
                                keyGroupRange,
                                keyGroupPrefixBytes,
@@ -563,7 +563,7 @@ void initializeSnapshotStrategy(
                        this.checkpointSnapshotStrategy = new 
RocksIncrementalSnapshotStrategy<>(
                                db,
                                rocksDBResourceGuard,
-                               keySerializer,
+                               getKeySerializer(),
                                kvStateInformation,
                                keyGroupRange,
                                keyGroupPrefixBytes,
@@ -655,6 +655,8 @@ private RocksDB openDB(
                /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
                private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
 
+               private boolean isKeySerializerCompatibilityChecked;
+
                /**
                 * Creates a restore operation object for the given state 
backend instance.
                 *
@@ -720,11 +722,16 @@ private void restoreKVStateMetaData() throws IOException, 
StateMigrationExceptio
 
                        serializationProxy.read(currentStateHandleInView);
 
-                       // check for key serializer compatibility; this also 
reconfigures the
-                       // key serializer to be compatible, if it is required 
and is possible
-                       if (!serializationProxy.getKeySerializerConfigSnapshot()
-                                       
.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySerializer).isCompatibleAsIs())
 {
-                               throw new StateMigrationException("The new key 
serializer must be compatible.");
+                       if (!isKeySerializerCompatibilityChecked) {
+                               // check for key serializer compatibility; this 
also reconfigures the
+                               // key serializer to be compatible, if it is 
required and is possible
+                               TypeSerializerSchemaCompatibility<K> 
keySerializerSchemaCompat =
+                                       
rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
+                               if 
(keySerializerSchemaCompat.isCompatibleAfterMigration() || 
keySerializerSchemaCompat.isIncompatible()) {
+                                       throw new StateMigrationException("The 
new key serializer must be compatible.");
+                               }
+
+                               isKeySerializerCompatibilityChecked = true;
                        }
 
                        this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
@@ -821,6 +828,7 @@ private void restoreKVStateData() throws IOException, 
RocksDBException {
                private final SortedMap<Long, Set<StateHandleID>> 
restoredSstFiles;
                private UUID restoredBackendUID;
                private long lastCompletedCheckpointId;
+               private boolean isKeySerializerCompatibilityChecked;
 
                private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
 
@@ -1265,11 +1273,16 @@ private void restoreInstanceDirectoryFromPath(Path 
source) throws IOException {
                                DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
                                serializationProxy.read(in);
 
-                               // check for key serializer compatibility; this 
also reconfigures the
-                               // key serializer to be compatible, if it is 
required and is possible
-                               if 
(!serializationProxy.getKeySerializerConfigSnapshot()
-                                               
.resolveSchemaCompatibility(stateBackend.keySerializer).isCompatibleAsIs()) {
-                                       throw new StateMigrationException("The 
new key serializer must be compatible.");
+                               if (!isKeySerializerCompatibilityChecked) {
+                                       // check for key serializer 
compatibility; this also reconfigures the
+                                       // key serializer to be compatible, if 
it is required and is possible
+                                       TypeSerializerSchemaCompatibility<T> 
keySerializerSchemaCompat =
+                                               
stateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
+                                       if 
(keySerializerSchemaCompat.isCompatibleAfterMigration() || 
keySerializerSchemaCompat.isIncompatible()) {
+                                               throw new 
StateMigrationException("The new key serializer must be compatible.");
+                                       }
+
+                                       isKeySerializerCompatibilityChecked = 
true;
                                }
 
                                return 
serializationProxy.getStateMetaInfoSnapshots();
@@ -1346,7 +1359,7 @@ private void restoreInstanceDirectoryFromPath(Path 
source) throws IOException {
                
restoredKvStateMetaInfo.updateSnapshotTransformer(snapshotTransformer);
 
                TypeSerializerSchemaCompatibility<N> s = 
restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
-               if (!s.isCompatibleAsIs()) {
+               if (s.isCompatibleAfterMigration() || s.isIncompatible()) {
                        throw new StateMigrationException("The new namespace 
serializer must be compatible.");
                }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to