http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
index d571dcc..91d7aab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -18,15 +18,10 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -34,23 +29,29 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Serialization proxy for all meta data in operator state backends. In the 
future we might also migrate the actual state
+ * Serialization proxy for all meta data in operator state backends. In the 
future we might also requiresMigration the actual state
  * serialization logic here.
  */
 public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritable {
 
-       private static final int VERSION = 1;
+       public static final int VERSION = 2;
 
-       private List<StateMetaInfo<?>> namedStateSerializationProxies;
+       private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
stateMetaInfoSnapshots;
        private ClassLoader userCodeClassLoader;
 
+       private int restoredVersion;
+
        public OperatorBackendSerializationProxy(ClassLoader 
userCodeClassLoader) {
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
        }
 
-       public OperatorBackendSerializationProxy(List<StateMetaInfo<?>> 
namedStateSerializationProxies) {
-               this.namedStateSerializationProxies = 
Preconditions.checkNotNull(namedStateSerializationProxies);
-               
Preconditions.checkArgument(namedStateSerializationProxies.size() <= 
Short.MAX_VALUE);
+       public OperatorBackendSerializationProxy(
+                       
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
stateMetaInfoSnapshots) {
+
+               this.stateMetaInfoSnapshots = 
Preconditions.checkNotNull(stateMetaInfoSnapshots);
+               Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE);
+
+               this.restoredVersion = VERSION;
        }
 
        @Override
@@ -59,129 +60,44 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
        }
 
        @Override
-       public void write(DataOutputView out) throws IOException {
-               super.write(out);
-
-               out.writeShort(namedStateSerializationProxies.size());
-
-               for (StateMetaInfo<?> kvState : namedStateSerializationProxies) 
{
-                       kvState.write(out);
-               }
+       protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
+               super.resolveVersionRead(foundVersion);
+               this.restoredVersion = foundVersion;
        }
 
        @Override
-       public void read(DataInputView out) throws IOException {
-               super.read(out);
-
-               int numKvStates = out.readShort();
-               namedStateSerializationProxies = new ArrayList<>(numKvStates);
-               for (int i = 0; i < numKvStates; ++i) {
-                       StateMetaInfo<?> stateSerializationProxy = new 
StateMetaInfo<>(userCodeClassLoader);
-                       stateSerializationProxy.read(out);
-                       
namedStateSerializationProxies.add(stateSerializationProxy);
-               }
+       public boolean isCompatibleVersion(int version) {
+               // we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
+               return super.isCompatibleVersion(version) || version == 1;
        }
 
-       public List<StateMetaInfo<?>> getNamedStateSerializationProxies() {
-               return namedStateSerializationProxies;
-       }
-
-       
//----------------------------------------------------------------------------------------------------------------------
-
-       public static class StateMetaInfo<S> implements IOReadableWritable {
-
-               private String name;
-               private TypeSerializer<S> stateSerializer;
-               private OperatorStateHandle.Mode mode;
-
-               private ClassLoader userClassLoader;
-
-               @VisibleForTesting
-               public StateMetaInfo(ClassLoader userClassLoader) {
-                       this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
-               }
-
-               public StateMetaInfo(String name, TypeSerializer<S> 
stateSerializer, OperatorStateHandle.Mode mode) {
-                       this.name = Preconditions.checkNotNull(name);
-                       this.stateSerializer = 
Preconditions.checkNotNull(stateSerializer);
-                       this.mode = Preconditions.checkNotNull(mode);
-               }
-
-               public String getName() {
-                       return name;
-               }
-
-               public void setName(String name) {
-                       this.name = name;
-               }
-
-               public TypeSerializer<S> getStateSerializer() {
-                       return stateSerializer;
-               }
-
-               public void setStateSerializer(TypeSerializer<S> 
stateSerializer) {
-                       this.stateSerializer = stateSerializer;
-               }
-
-               public OperatorStateHandle.Mode getMode() {
-                       return mode;
-               }
-
-               public void setMode(OperatorStateHandle.Mode mode) {
-                       this.mode = mode;
-               }
-
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       out.writeUTF(getName());
-                       out.writeByte(getMode().ordinal());
-                       DataOutputViewStream dos = new 
DataOutputViewStream(out);
-                       InstantiationUtil.serializeObject(dos, 
getStateSerializer());
-               }
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               super.write(out);
 
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       setName(in.readUTF());
-                       
setMode(OperatorStateHandle.Mode.values()[in.readByte()]);
-                       DataInputViewStream dis = new DataInputViewStream(in);
-                       try {
-                               TypeSerializer<S> stateSerializer = 
InstantiationUtil.deserializeObject(dis, userClassLoader);
-                               setStateSerializer(stateSerializer);
-                       } catch (ClassNotFoundException exception) {
-                               throw new IOException(exception);
-                       }
+               out.writeShort(stateMetaInfoSnapshots.size());
+               for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> kvState 
: stateMetaInfoSnapshots) {
+                       OperatorBackendStateMetaInfoSnapshotReaderWriters
+                               .getWriterForVersion(VERSION, kvState)
+                               .writeStateMetaInfo(out);
                }
+       }
 
-               @Override
-               public boolean equals(Object o) {
-
-                       if (this == o) {
-                               return true;
-                       }
-
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-
-                       StateMetaInfo<?> metaInfo = (StateMetaInfo<?>) o;
-
-                       if (!getName().equals(metaInfo.getName())) {
-                               return false;
-                       }
-
-                       if 
(!getStateSerializer().equals(metaInfo.getStateSerializer())) {
-                               return false;
-                       }
-
-                       return getMode() == metaInfo.getMode();
+       @Override
+       public void read(DataInputView in) throws IOException {
+               super.read(in);
+
+               int numKvStates = in.readShort();
+               stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+               for (int i = 0; i < numKvStates; i++) {
+                       stateMetaInfoSnapshots.add(
+                               
OperatorBackendStateMetaInfoSnapshotReaderWriters
+                                       .getReaderForVersion(restoredVersion, 
userCodeClassLoader)
+                                       .readStateMetaInfo(in));
                }
+       }
 
-               @Override
-               public int hashCode() {
-                       int result = getName().hashCode();
-                       result = 31 * result + getStateSerializer().hashCode();
-                       result = 31 * result + getMode().hashCode();
-                       return result;
-               }
+       public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
getStateMetaInfoSnapshots() {
+               return stateMetaInfoSnapshots;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
new file mode 100644
index 0000000..9ab106b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Readers and writers for different versions of the {@link 
RegisteredOperatorBackendStateMetaInfo.Snapshot}.
+ * Outdated formats are also kept here for documentation of history backlog.
+ */
+public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
+
+       // 
-------------------------------------------------------------------------------
+       //  Writers
+       //   - v1: Flink 1.2.x
+       //   - v2: Flink 1.3.x
+       // 
-------------------------------------------------------------------------------
+
+       public static <S> OperatorBackendStateMetaInfoWriter 
getWriterForVersion(
+                       int version, 
RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+
+               switch (version) {
+                       case 1:
+                               return new 
OperatorBackendStateMetaInfoWriterV1<>(stateMetaInfo);
+
+                       // current version
+                       case OperatorBackendSerializationProxy.VERSION:
+                               return new 
OperatorBackendStateMetaInfoWriterV2<>(stateMetaInfo);
+
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                               "Unrecognized operator backend 
state meta info writer version: " + version);
+               }
+       }
+
+       public interface OperatorBackendStateMetaInfoWriter {
+               void writeStateMetaInfo(DataOutputView out) throws IOException;
+       }
+
+       public static abstract class 
AbstractOperatorBackendStateMetaInfoWriter<S>
+                       implements OperatorBackendStateMetaInfoWriter {
+
+               protected final 
RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo;
+
+               public 
AbstractOperatorBackendStateMetaInfoWriter(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>
 stateMetaInfo) {
+                       this.stateMetaInfo = 
Preconditions.checkNotNull(stateMetaInfo);
+               }
+       }
+
+       public static class OperatorBackendStateMetaInfoWriterV1<S> extends 
AbstractOperatorBackendStateMetaInfoWriter<S> {
+
+               public 
OperatorBackendStateMetaInfoWriterV1(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>
 stateMetaInfo) {
+                       super(stateMetaInfo);
+               }
+
+               @Override
+               public void writeStateMetaInfo(DataOutputView out) throws 
IOException {
+                       out.writeUTF(stateMetaInfo.getName());
+                       
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
+                       new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(out);
+               }
+       }
+
+       public static class OperatorBackendStateMetaInfoWriterV2<S> extends 
AbstractOperatorBackendStateMetaInfoWriter<S> {
+
+               public 
OperatorBackendStateMetaInfoWriterV2(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>
 stateMetaInfo) {
+                       super(stateMetaInfo);
+               }
+
+               @Override
+               public void writeStateMetaInfo(DataOutputView out) throws 
IOException {
+                       out.writeUTF(stateMetaInfo.getName());
+                       
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
+
+                       // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
+                       try (
+                               ByteArrayOutputStreamWithPos outWithPos = new 
ByteArrayOutputStreamWithPos();
+                               DataOutputViewStreamWrapper outViewWrapper = 
new DataOutputViewStreamWrapper(outWithPos)) {
+
+                               new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(outViewWrapper);
+
+                               // write the start offset of the config snapshot
+                               out.writeInt(outWithPos.getPosition());
+                               
TypeSerializerUtil.writeSerializerConfigSnapshot(
+                                       outViewWrapper,
+                                       
stateMetaInfo.getPartitionStateSerializerConfigSnapshot());
+
+                               // write the total number of bytes and flush
+                               out.writeInt(outWithPos.getPosition());
+                               out.write(outWithPos.getBuf(), 0, 
outWithPos.getPosition());
+                       }
+               }
+       }
+
+       // 
-------------------------------------------------------------------------------
+       //  Readers
+       //   - v1: Flink 1.2.x
+       //   - v2: Flink 1.3.x
+       // 
-------------------------------------------------------------------------------
+
+       public static <S> OperatorBackendStateMetaInfoReader<S> 
getReaderForVersion(
+                       int version, ClassLoader userCodeClassLoader) {
+
+               switch (version) {
+                       case 1:
+                               return new 
OperatorBackendStateMetaInfoReaderV1<>(userCodeClassLoader);
+
+                       // current version
+                       case OperatorBackendSerializationProxy.VERSION:
+                               return new 
OperatorBackendStateMetaInfoReaderV2<>(userCodeClassLoader);
+
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                       "Unrecognized operator backend state 
meta info reader version: " + version);
+               }
+       }
+
+       public interface OperatorBackendStateMetaInfoReader<S> {
+               RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readStateMetaInfo(DataInputView in) throws IOException;
+       }
+
+       public static abstract class 
AbstractOperatorBackendStateMetaInfoReader<S>
+               implements OperatorBackendStateMetaInfoReader<S> {
+
+               protected final ClassLoader userCodeClassLoader;
+
+               public AbstractOperatorBackendStateMetaInfoReader(ClassLoader 
userCodeClassLoader) {
+                       this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+               }
+       }
+
+       public static class OperatorBackendStateMetaInfoReaderV1<S> extends 
AbstractOperatorBackendStateMetaInfoReader<S> {
+
+               public OperatorBackendStateMetaInfoReaderV1(ClassLoader 
userCodeClassLoader) {
+                       super(userCodeClassLoader);
+               }
+
+               @Override
+               public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readStateMetaInfo(DataInputView in) throws IOException {
+                       RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
stateMetaInfo =
+                               new 
RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
+
+                       stateMetaInfo.setName(in.readUTF());
+                       
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+                       DataInputViewStream dis = new DataInputViewStream(in);
+                       try {
+                               TypeSerializer<S> stateSerializer = 
InstantiationUtil.deserializeObject(dis, userCodeClassLoader);
+                               
stateMetaInfo.setPartitionStateSerializer(stateSerializer);
+                       } catch (ClassNotFoundException exception) {
+                               throw new IOException(exception);
+                       }
+
+                       // old versions do not contain the partition state 
serializer's configuration snapshot
+                       
stateMetaInfo.setPartitionStateSerializerConfigSnapshot(null);
+
+                       return stateMetaInfo;
+               }
+       }
+
+       public static class OperatorBackendStateMetaInfoReaderV2<S> extends 
AbstractOperatorBackendStateMetaInfoReader<S> {
+
+               public OperatorBackendStateMetaInfoReaderV2(ClassLoader 
userCodeClassLoader) {
+                       super(userCodeClassLoader);
+               }
+
+               @Override
+               public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readStateMetaInfo(DataInputView in) throws IOException {
+                       RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
stateMetaInfo =
+                               new 
RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
+
+                       stateMetaInfo.setName(in.readUTF());
+                       
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
+                       // read start offset of configuration snapshot
+                       int configSnapshotStartOffset = in.readInt();
+
+                       int totalBytes = in.readInt();
+
+                       byte[] buffer = new byte[totalBytes];
+                       in.readFully(buffer);
+
+                       ByteArrayInputStreamWithPos inWithPos = new 
ByteArrayInputStreamWithPos(buffer);
+                       DataInputViewStreamWrapper inViewWrapper = new 
DataInputViewStreamWrapper(inWithPos);
+
+                       try {
+                               final TypeSerializerSerializationProxy<S> 
partitionStateSerializerProxy =
+                                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
+                               
partitionStateSerializerProxy.read(inViewWrapper);
+                               
stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer());
+                       } catch (IOException e) {
+                               stateMetaInfo.setPartitionStateSerializer(null);
+                       }
+
+                       // make sure we start from the partition state 
serializer bytes position
+                       inWithPos.setPosition(configSnapshotStartOffset);
+                       stateMetaInfo.setPartitionStateSerializerConfigSnapshot(
+                               
TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, 
userCodeClassLoader));
+
+                       return stateMetaInfo;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
deleted file mode 100644
index 0d4b3c8..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Compound meta information for a registered state in a keyed state backend. 
This combines all serializers and the
- * state name.
- *
- * @param <N> Type of namespace
- * @param <S> Type of state value
- */
-public class RegisteredBackendStateMetaInfo<N, S> {
-
-       private final StateDescriptor.Type stateType;
-       private final String name;
-       private final TypeSerializer<N> namespaceSerializer;
-       private final TypeSerializer<S> stateSerializer;
-
-       public 
RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N, 
S> metaInfoProxy) {
-               this(
-                               metaInfoProxy.getStateType(),
-                               metaInfoProxy.getStateName(),
-                               
metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer(),
-                               
metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer());
-       }
-
-       public RegisteredBackendStateMetaInfo(
-                       StateDescriptor.Type stateType,
-                       String name,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<S> stateSerializer) {
-
-               this.stateType = checkNotNull(stateType);
-               this.name = checkNotNull(name);
-               this.namespaceSerializer = checkNotNull(namespaceSerializer);
-               this.stateSerializer = checkNotNull(stateSerializer);
-       }
-
-       public StateDescriptor.Type getStateType() {
-               return stateType;
-       }
-
-       public String getName() {
-               return name;
-       }
-
-       public TypeSerializer<N> getNamespaceSerializer() {
-               return namespaceSerializer;
-       }
-
-       public TypeSerializer<S> getStateSerializer() {
-               return stateSerializer;
-       }
-
-       public boolean canRestoreFrom(RegisteredBackendStateMetaInfo<?, ?> 
other) {
-
-               if (this == other) {
-                       return true;
-               }
-
-               if (null == other) {
-                       return false;
-               }
-
-               if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
-                               && 
!other.stateType.equals(StateDescriptor.Type.UNKNOWN)
-                               && !stateType.equals(other.stateType)) {
-                       return false;
-               }
-
-               if (!name.equals(other.getName())) {
-                       return false;
-               }
-
-               return (stateSerializer.canRestoreFrom(other.stateSerializer)) 
&&
-                               
(namespaceSerializer.canRestoreFrom(other.namespaceSerializer)
-                                               // we also check if there is 
just a migration proxy that should be replaced by any real serializer
-                                               || other.namespaceSerializer 
instanceof MigrationNamespaceSerializerProxy);
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               RegisteredBackendStateMetaInfo<?, ?> that = 
(RegisteredBackendStateMetaInfo<?, ?>) o;
-
-               if (!stateType.equals(that.stateType)) {
-                       return false;
-               }
-
-               if (!getName().equals(that.getName())) {
-                       return false;
-               }
-
-               return getStateSerializer().equals(that.getStateSerializer())
-                               && 
getNamespaceSerializer().equals(that.getNamespaceSerializer());
-       }
-
-       @Override
-       public String toString() {
-               return "RegisteredBackendStateMetaInfo{" +
-                               "stateType=" + stateType +
-                               ", name='" + name + '\'' +
-                               ", namespaceSerializer=" + namespaceSerializer +
-                               ", stateSerializer=" + stateSerializer +
-                               '}';
-       }
-
-       @Override
-       public int hashCode() {
-               int result = getName().hashCode();
-               result = 31 * result + getStateType().hashCode();
-               result = 31 * result + (getNamespaceSerializer() != null ? 
getNamespaceSerializer().hashCode() : 0);
-               result = 31 * result + (getStateSerializer() != null ? 
getStateSerializer().hashCode() : 0);
-               return result;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
new file mode 100644
index 0000000..e1a7e06
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. 
This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredKeyedBackendStateMetaInfo<N, S> {
+
+       private final StateDescriptor.Type stateType;
+       private final String name;
+       private final TypeSerializer<N> namespaceSerializer;
+       private final TypeSerializer<S> stateSerializer;
+
+       public RegisteredKeyedBackendStateMetaInfo(
+                       StateDescriptor.Type stateType,
+                       String name,
+                       TypeSerializer<N> namespaceSerializer,
+                       TypeSerializer<S> stateSerializer) {
+
+               this.stateType = checkNotNull(stateType);
+               this.name = checkNotNull(name);
+               this.namespaceSerializer = checkNotNull(namespaceSerializer);
+               this.stateSerializer = checkNotNull(stateSerializer);
+       }
+
+       public StateDescriptor.Type getStateType() {
+               return stateType;
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public TypeSerializer<N> getNamespaceSerializer() {
+               return namespaceSerializer;
+       }
+
+       public TypeSerializer<S> getStateSerializer() {
+               return stateSerializer;
+       }
+
+       public Snapshot<N, S> snapshot() {
+               return new Snapshot<>(
+                       stateType,
+                       name,
+                       namespaceSerializer.duplicate(),
+                       stateSerializer.duplicate(),
+                       namespaceSerializer.snapshotConfiguration(),
+                       stateSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               RegisteredKeyedBackendStateMetaInfo<?, ?> that = 
(RegisteredKeyedBackendStateMetaInfo<?, ?>) o;
+
+               if (!stateType.equals(that.stateType)) {
+                       return false;
+               }
+
+               if (!getName().equals(that.getName())) {
+                       return false;
+               }
+
+               return getStateSerializer().equals(that.getStateSerializer())
+                               && 
getNamespaceSerializer().equals(that.getNamespaceSerializer());
+       }
+
+       @Override
+       public String toString() {
+               return "RegisteredKeyedBackendStateMetaInfo{" +
+                               "stateType=" + stateType +
+                               ", name='" + name + '\'' +
+                               ", namespaceSerializer=" + namespaceSerializer +
+                               ", stateSerializer=" + stateSerializer +
+                               '}';
+       }
+
+       @Override
+       public int hashCode() {
+               int result = getName().hashCode();
+               result = 31 * result + getStateType().hashCode();
+               result = 31 * result + getNamespaceSerializer().hashCode();
+               result = 31 * result + getStateSerializer().hashCode();
+               return result;
+       }
+
+       /**
+        * A consistent snapshot of a {@link 
RegisteredKeyedBackendStateMetaInfo}.
+        */
+       public static class Snapshot<N, S> {
+
+               private StateDescriptor.Type stateType;
+               private String name;
+               private TypeSerializer<N> namespaceSerializer;
+               private TypeSerializer<S> stateSerializer;
+               private TypeSerializerConfigSnapshot 
namespaceSerializerConfigSnapshot;
+               private TypeSerializerConfigSnapshot 
stateSerializerConfigSnapshot;
+
+               /** Empty constructor used when restoring the state meta info 
snapshot. */
+               Snapshot() {}
+
+               private Snapshot(
+                               StateDescriptor.Type stateType,
+                               String name,
+                               TypeSerializer<N> namespaceSerializer,
+                               TypeSerializer<S> stateSerializer,
+                               TypeSerializerConfigSnapshot 
namespaceSerializerConfigSnapshot,
+                               TypeSerializerConfigSnapshot 
stateSerializerConfigSnapshot) {
+
+                       this.stateType = Preconditions.checkNotNull(stateType);
+                       this.name = Preconditions.checkNotNull(name);
+                       this.namespaceSerializer = 
Preconditions.checkNotNull(namespaceSerializer);
+                       this.stateSerializer = 
Preconditions.checkNotNull(stateSerializer);
+                       this.namespaceSerializerConfigSnapshot = 
Preconditions.checkNotNull(namespaceSerializerConfigSnapshot);
+                       this.stateSerializerConfigSnapshot = 
Preconditions.checkNotNull(stateSerializerConfigSnapshot);
+               }
+
+               public StateDescriptor.Type getStateType() {
+                       return stateType;
+               }
+
+               void setStateType(StateDescriptor.Type stateType) {
+                       this.stateType = stateType;
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               void setName(String name) {
+                       this.name = name;
+               }
+
+               public TypeSerializer<N> getNamespaceSerializer() {
+                       return namespaceSerializer;
+               }
+
+               void setNamespaceSerializer(TypeSerializer<N> 
namespaceSerializer) {
+                       this.namespaceSerializer = namespaceSerializer;
+               }
+
+               public TypeSerializer<S> getStateSerializer() {
+                       return stateSerializer;
+               }
+
+               void setStateSerializer(TypeSerializer<S> stateSerializer) {
+                       this.stateSerializer = stateSerializer;
+               }
+
+               public TypeSerializerConfigSnapshot 
getNamespaceSerializerConfigSnapshot() {
+                       return namespaceSerializerConfigSnapshot;
+               }
+
+               void 
setNamespaceSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
namespaceSerializerConfigSnapshot) {
+                       this.namespaceSerializerConfigSnapshot = 
namespaceSerializerConfigSnapshot;
+               }
+
+               public TypeSerializerConfigSnapshot 
getStateSerializerConfigSnapshot() {
+                       return stateSerializerConfigSnapshot;
+               }
+
+               void 
setStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
stateSerializerConfigSnapshot) {
+                       this.stateSerializerConfigSnapshot = 
stateSerializerConfigSnapshot;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       Snapshot<?, ?> that = (Snapshot<?, ?>) o;
+
+                       if (!stateType.equals(that.stateType)) {
+                               return false;
+                       }
+
+                       if (!getName().equals(that.getName())) {
+                               return false;
+                       }
+
+                       // need to check for nulls because serializer and 
config snapshots may be null on restore
+                       return
+                               ((getStateSerializer() == null && 
that.getStateSerializer() == null)
+                                       || 
getStateSerializer().equals(that.getStateSerializer()))
+                               && ((getNamespaceSerializer() == null && 
that.getNamespaceSerializer() == null)
+                                       || 
getNamespaceSerializer().equals(that.getNamespaceSerializer()))
+                               && ((getNamespaceSerializerConfigSnapshot() == 
null && that.getNamespaceSerializerConfigSnapshot() == null)
+                                       || 
getNamespaceSerializerConfigSnapshot().equals(that.getNamespaceSerializerConfigSnapshot()))
+                               && ((getStateSerializerConfigSnapshot() == null 
&& that.getStateSerializerConfigSnapshot() == null)
+                                       || 
getStateSerializerConfigSnapshot().equals(that.getStateSerializerConfigSnapshot()));
+               }
+
+               @Override
+               public int hashCode() {
+                       // need to check for nulls because serializer and 
config snapshots may be null on restore
+                       int result = getName().hashCode();
+                       result = 31 * result + getStateType().hashCode();
+                       result = 31 * result + (getNamespaceSerializer() != 
null ? getNamespaceSerializer().hashCode() : 0);
+                       result = 31 * result + (getStateSerializer() != null ? 
getStateSerializer().hashCode() : 0);
+                       result = 31 * result + 
(getNamespaceSerializerConfigSnapshot() != null ? 
getNamespaceSerializerConfigSnapshot().hashCode() : 0);
+                       result = 31 * result + 
(getStateSerializerConfigSnapshot() != null ? 
getStateSerializerConfigSnapshot().hashCode() : 0);
+                       return result;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
new file mode 100644
index 0000000..b43fc9c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Compound meta information for a registered state in an operator state 
backend.
+ * This contains the state name, assignment mode, and state partition 
serializer.
+ *
+ * @param <S> Type of the state.
+ */
+public class RegisteredOperatorBackendStateMetaInfo<S> {
+
+       /**
+        * The name of the state, as registered by the user
+        */
+       private final String name;
+
+       /**
+        * The mode how elements in this state are assigned to tasks during 
restore
+        */
+       private final OperatorStateHandle.Mode assignmentMode;
+
+       /**
+        * The type serializer for the elements in the state list
+        */
+       private final TypeSerializer<S> partitionStateSerializer;
+
+       public RegisteredOperatorBackendStateMetaInfo(
+                       String name,
+                       TypeSerializer<S> partitionStateSerializer,
+                       OperatorStateHandle.Mode assignmentMode) {
+
+               this.name = Preconditions.checkNotNull(name);
+               this.partitionStateSerializer = 
Preconditions.checkNotNull(partitionStateSerializer);
+               this.assignmentMode = 
Preconditions.checkNotNull(assignmentMode);
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public OperatorStateHandle.Mode getAssignmentMode() {
+               return assignmentMode;
+       }
+
+       public TypeSerializer<S> getPartitionStateSerializer() {
+               return partitionStateSerializer;
+       }
+
+       public Snapshot<S> snapshot() {
+               return new Snapshot<>(
+                       name,
+                       assignmentMode,
+                       partitionStateSerializer.duplicate(),
+                       partitionStateSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               }
+
+               if (obj == null) {
+                       return false;
+               }
+
+               return (obj instanceof RegisteredOperatorBackendStateMetaInfo)
+                       && 
name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName())
+                       && 
assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) 
obj).getAssignmentMode())
+                       && 
partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) 
obj).getPartitionStateSerializer());
+       }
+
+       @Override
+       public int hashCode() {
+               int result = getName().hashCode();
+               result = 31 * result + getAssignmentMode().hashCode();
+               result = 31 * result + getPartitionStateSerializer().hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "RegisteredOperatorBackendStateMetaInfo{" +
+                       "name='" + name + "\'" +
+                       ", assignmentMode=" + assignmentMode +
+                       ", partitionStateSerializer=" + 
partitionStateSerializer +
+                       '}';
+       }
+
+       /**
+        * A consistent snapshot of a {@link 
RegisteredOperatorBackendStateMetaInfo}.
+        */
+       public static class Snapshot<S> {
+
+               private String name;
+               private OperatorStateHandle.Mode assignmentMode;
+               private TypeSerializer<S> partitionStateSerializer;
+               private TypeSerializerConfigSnapshot 
partitionStateSerializerConfigSnapshot;
+
+               /** Empty constructor used when restoring the state meta info 
snapshot. */
+               Snapshot() {}
+
+               private Snapshot(
+                               String name,
+                               OperatorStateHandle.Mode assignmentMode,
+                               TypeSerializer<S> partitionStateSerializer,
+                               TypeSerializerConfigSnapshot 
partitionStateSerializerConfigSnapshot) {
+
+                       this.name = Preconditions.checkNotNull(name);
+                       this.assignmentMode = 
Preconditions.checkNotNull(assignmentMode);
+                       this.partitionStateSerializer = 
Preconditions.checkNotNull(partitionStateSerializer);
+                       this.partitionStateSerializerConfigSnapshot = 
Preconditions.checkNotNull(partitionStateSerializerConfigSnapshot);
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               void setName(String name) {
+                       this.name = name;
+               }
+
+               public OperatorStateHandle.Mode getAssignmentMode() {
+                       return assignmentMode;
+               }
+
+               void setAssignmentMode(OperatorStateHandle.Mode assignmentMode) 
{
+                       this.assignmentMode = assignmentMode;
+               }
+
+               public TypeSerializer<S> getPartitionStateSerializer() {
+                       return partitionStateSerializer;
+               }
+
+               void setPartitionStateSerializer(TypeSerializer<S> 
partitionStateSerializer) {
+                       this.partitionStateSerializer = 
partitionStateSerializer;
+               }
+
+               public TypeSerializerConfigSnapshot 
getPartitionStateSerializerConfigSnapshot() {
+                       return partitionStateSerializerConfigSnapshot;
+               }
+
+               void 
setPartitionStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
partitionStateSerializerConfigSnapshot) {
+                       this.partitionStateSerializerConfigSnapshot = 
partitionStateSerializerConfigSnapshot;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == this) {
+                               return true;
+                       }
+
+                       if (obj == null) {
+                               return false;
+                       }
+
+                       // need to check for nulls because serializer and 
config snapshots may be null on restore
+                       return (obj instanceof Snapshot)
+                               && name.equals(((Snapshot) obj).getName())
+                               && assignmentMode.equals(((Snapshot) 
obj).getAssignmentMode())
+                               && ((partitionStateSerializer == null && 
((Snapshot) obj).getPartitionStateSerializer() == null)
+                                       || 
partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer()))
+                               && ((partitionStateSerializerConfigSnapshot == 
null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null)
+                                       || 
partitionStateSerializerConfigSnapshot.equals(((Snapshot) 
obj).getPartitionStateSerializerConfigSnapshot()));
+               }
+
+               @Override
+               public int hashCode() {
+                       // need to check for nulls because serializer and 
config snapshots may be null on restore
+                       int result = getName().hashCode();
+                       result = 31 * result + getAssignmentMode().hashCode();
+                       result = 31 * result + (getPartitionStateSerializer() 
!= null ? getPartitionStateSerializer().hashCode() : 0);
+                       result = 31 * result + 
(getPartitionStateSerializerConfigSnapshot() != null ? 
getPartitionStateSerializerConfigSnapshot().hashCode() : 0);
+                       return result;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
new file mode 100644
index 0000000..978f28d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * Utilities related to state migration, commonly used in the state backends.
+ */
+public class StateMigrationUtil {
+
+       /**
+        * Resolves the final compatibility result of two serializers by taking 
into account compound information,
+        * including the preceding serializer, the preceding serializer's 
configuration snapshot, and the new serializer.
+        *
+        * The final result is determined as follows:
+        *   1. If there is no configuration snapshot of the preceding 
serializer,
+        *      assumes the new serializer to be compatible.
+        *   2. Confront the configuration snapshot with the new serializer.
+        *   3. If the result is compatible, just return that as the result.
+        *   4. If not compatible and requires migration, check if the 
preceding serializer is valid.
+        *      If yes, use that as the convert deserializer for state 
migration.
+        *   5. If the preceding serializer is not valid, check if the result 
came with a convert deserializer.
+        *      If yes, use that for state migration and simply return the 
result.
+        *   6. If all of above fails, state migration is required but could 
not be performed; throw exception.
+        *
+        * @param precedingSerializer the preceding serializer used to write 
the data
+        * @param dummySerializerClassTag any class tags that identifies the 
preceding serializer as a dummy placeholder
+        * @param precedingSerializerConfigSnapshot configuration snapshot of 
the preceding serializer
+        * @param newSerializer the new serializer to ensure compatibility with
+        *
+        * @param <T> Type of the data handled by the serializers
+        *
+        * @return the final resolved compatiblity result
+        */
+       public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+                       TypeSerializer<T> precedingSerializer,
+                       Class<?> dummySerializerClassTag,
+                       TypeSerializerConfigSnapshot 
precedingSerializerConfigSnapshot,
+                       TypeSerializer<T> newSerializer) {
+
+               if (precedingSerializerConfigSnapshot != null) {
+                       CompatibilityResult<T> initialResult = 
newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+
+                       if (!initialResult.requiresMigration()) {
+                               return initialResult;
+                       } else {
+                               if (precedingSerializer != null && 
!(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
+                                       // if the preceding serializer exists 
and is not a dummy, use
+                                       // that for converting instead of the 
provided convert deserializer
+                                       return 
CompatibilityResult.requiresMigration(precedingSerializer);
+                               } else if 
(initialResult.getConvertDeserializer() != null) {
+                                       return initialResult;
+                               } else {
+                                       throw new RuntimeException(
+                                               "State migration required, but 
there is no available serializer capable of reading previous data.");
+                               }
+                       }
+               } else {
+                       // if the configuration snapshot of the preceding 
serializer cannot be provided,
+                       // we can only simply assume that the new serializer is 
compatible
+                       return CompatibilityResult.compatible();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
index 8b58891..2800899 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 
 import java.io.IOException;
 
@@ -89,4 +90,11 @@ public final class VoidNamespaceSerializer extends 
TypeSerializerSingleton<VoidN
        public boolean canEqual(Object obj) {
                return obj instanceof VoidNamespaceSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               // we might be replacing a migration namespace serializer, in 
which case we just assume compatibility
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(MigrationNamespaceSerializerProxy.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index d63b6d3..7b61da1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
@@ -196,7 +196,7 @@ public class CopyOnWriteStateTable<K, N, S> extends 
StateTable<K, N, S> implemen
         * @param keyContext the key context.
         * @param metaInfo   the meta information, including the type 
serializer for state copy-on-write.
         */
-       CopyOnWriteStateTable(InternalKeyContext<K> keyContext, 
RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+       CopyOnWriteStateTable(InternalKeyContext<K> keyContext, 
RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
                this(keyContext, metaInfo, 1024);
        }
 
@@ -209,7 +209,7 @@ public class CopyOnWriteStateTable<K, N, S> extends 
StateTable<K, N, S> implemen
         * @throws IllegalArgumentException when the capacity is less than zero.
         */
        @SuppressWarnings("unchecked")
-       private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, 
RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+       private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, 
RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo, int capacity) {
                super(keyContext, metaInfo);
 
                // initialized tables to EMPTY_TABLE.
@@ -532,12 +532,12 @@ public class CopyOnWriteStateTable<K, N, S> extends 
StateTable<K, N, S> implemen
        }
 
        @Override
-       public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+       public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
                return metaInfo;
        }
 
        @Override
-       public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+       public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> 
metaInfo) {
                this.metaInfo = metaInfo;
        }
 
@@ -1063,4 +1063,4 @@ public class CopyOnWriteStateTable<K, N, S> extends 
StateTable<K, N, S> implemen
                        throw new UnsupportedOperationException("Read-only 
iterator");
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aecc72e..866ed28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -132,8 +132,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        TypeSerializer<N> namespaceSerializer,
                        TypeSerializer<V> valueSerializer) {
 
-               final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
-                               new RegisteredBackendStateMetaInfo<>(stateType, 
stateName, namespaceSerializer, valueSerializer);
+               final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
+                               new 
RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, 
namespaceSerializer, valueSerializer);
 
                @SuppressWarnings("unchecked")
                StateTable<K, N, V> stateTable = (StateTable<K, N, V>) 
stateTables.get(stateName);
@@ -142,12 +142,27 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        stateTable = newStateTable(newMetaInfo);
                        stateTables.put(stateName, stateTable);
                } else {
-                       if 
(!newMetaInfo.canRestoreFrom(stateTable.getMetaInfo())) {
-                               throw new RuntimeException("Trying to access 
state using incompatible meta info, was " +
-                                               stateTable.getMetaInfo() + " 
trying access with " + newMetaInfo);
+                       // TODO with eager registration in place, these checks 
should be moved to restorePartitionedState()
+
+                       Preconditions.checkState(
+                               
stateName.equals(stateTable.getMetaInfo().getName()),
+                               "Incompatible state names. " +
+                                       "Was [" + 
stateTable.getMetaInfo().getName() + "], " +
+                                       "registered with [" + 
newMetaInfo.getName() + "].");
+
+                       if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+                                       && 
!stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+                               Preconditions.checkState(
+                                       
newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
+                                       "Incompatible state types. " +
+                                               "Was [" + 
stateTable.getMetaInfo().getStateType() + "], " +
+                                               "registered with [" + 
newMetaInfo.getStateType() + "].");
                        }
+
                        stateTable.setMetaInfo(newMetaInfo);
                }
+
                return stateTable;
        }
 
@@ -240,21 +255,14 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                "Too many KV-States: " + stateTables.size() +
                                                ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
 
-               List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> 
metaInfoProxyList = new ArrayList<>(stateTables.size());
+               List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
metaInfoSnapshots = new ArrayList<>(stateTables.size());
 
                final Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
 
                final Map<StateTable<K, ?, ?>, StateTableSnapshot> 
cowStateStableSnapshots = new HashedMap(stateTables.size());
 
                for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
-                       RegisteredBackendStateMetaInfo<?, ?> metaInfo = 
kvState.getValue().getMetaInfo();
-                       KeyedBackendSerializationProxy.StateMetaInfo<?, ?> 
metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
-                                       metaInfo.getStateType(),
-                                       metaInfo.getName(),
-                                       metaInfo.getNamespaceSerializer(),
-                                       metaInfo.getStateSerializer());
-
-                       metaInfoProxyList.add(metaInfoProxy);
+                       
metaInfoSnapshots.add(kvState.getValue().getMetaInfo().snapshot());
                        kVStateToId.put(kvState.getKey(), kVStateToId.size());
                        StateTable<K, ?, ?> stateTable = kvState.getValue();
                        if (null != stateTable) {
@@ -263,7 +271,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                final KeyedBackendSerializationProxy serializationProxy =
-                               new 
KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+                               new 
KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
 
                //--------------------------------------------------- this 
becomes the end of sync part
 
@@ -376,23 +384,29 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                serializationProxy.read(inView);
 
-                               
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
-                                               
serializationProxy.getNamedStateSerializationProxies();
+                               
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
+                                               
serializationProxy.getStateMetaInfoSnapshots();
 
-                               for 
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy 
: metaInfoList) {
+                               for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : 
restoredMetaInfos) {
 
-                                       StateTable<K, ?, ?> stateTable = 
stateTables.get(metaInfoSerializationProxy.getStateName());
+                                       StateTable<K, ?, ?> stateTable = 
stateTables.get(restoredMetaInfo.getName());
 
                                        //important: only create a new table we 
did not already create it previously
                                        if (null == stateTable) {
 
-                                               
RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
-                                                               new 
RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+                                               
RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
+                                                               new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                                                       
restoredMetaInfo.getStateType(),
+                                                                       
restoredMetaInfo.getName(),
+                                                                       
restoredMetaInfo.getNamespaceSerializer(),
+                                                                       
restoredMetaInfo.getStateSerializer());
 
-                                               stateTable = 
newStateTable(registeredBackendStateMetaInfo);
-                                               
stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
-                                               
kvStatesById.put(numRegisteredKvStates, 
metaInfoSerializationProxy.getStateName());
+                                               stateTable = 
newStateTable(registeredKeyedBackendStateMetaInfo);
+                                               
stateTables.put(restoredMetaInfo.getName(), stateTable);
+                                               
kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
                                                ++numRegisteredKvStates;
+                                       } else {
+                                               // TODO with eager state 
registration in place, check here for serializer migration strategies
                                        }
                                }
 
@@ -410,7 +424,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        
Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
                                                        "Unexpected key-group 
in restore.");
 
-                                       for (int i = 0; i < 
metaInfoList.size(); i++) {
+                                       for (int i = 0; i < 
restoredMetaInfos.size(); i++) {
                                                int kvStateId = 
inView.readShort();
                                                StateTable<K, ?, ?> stateTable 
= stateTables.get(kvStatesById.get(kvStateId));
 
@@ -509,7 +523,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return sum;
        }
 
-       public <N, V> StateTable<K, N, V> 
newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+       public <N, V> StateTable<K, N, V> 
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
                return asynchronousSnapshots ?
                                new CopyOnWriteStateTable<>(this, newMetaInfo) :
                                new NestedMapsStateTable<>(this, newMetaInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 22f344d..75c31db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -63,7 +63,7 @@ public class NestedMapsStateTable<K, N, S> extends 
StateTable<K, N, S> {
         * @param keyContext the key context.
         * @param metaInfo the meta information for this state table.
         */
-       public NestedMapsStateTable(InternalKeyContext<K> keyContext, 
RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+       public NestedMapsStateTable(InternalKeyContext<K> keyContext, 
RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
                super(keyContext, metaInfo);
                this.keyGroupOffset = 
keyContext.getKeyGroupRange().getStartKeyGroup();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 62fc869..c1cdcc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -42,14 +42,14 @@ public abstract class StateTable<K, N, S> {
        /**
         * Combined meta information such as name and serializers for this state
         */
-       protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
+       protected RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo;
 
        /**
         *
         * @param keyContext the key context provides the key scope for all 
put/get/delete operations.
         * @param metaInfo the meta information, including the type serializer 
for state copy-on-write.
         */
-       public StateTable(InternalKeyContext<K> keyContext, 
RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+       public StateTable(InternalKeyContext<K> keyContext, 
RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
                this.keyContext = Preconditions.checkNotNull(keyContext);
                this.metaInfo = Preconditions.checkNotNull(metaInfo);
        }
@@ -168,11 +168,11 @@ public abstract class StateTable<K, N, S> {
                return metaInfo.getNamespaceSerializer();
        }
 
-       public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+       public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
                return metaInfo;
        }
 
-       public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+       public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> 
metaInfo) {
                this.metaInfo = metaInfo;
        }
 
@@ -186,4 +186,4 @@ public abstract class StateTable<K, N, S> {
 
        @VisibleForTesting
        public abstract int sizeOfNamespace(Object namespace);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index 53ec349..d7bc94e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -47,7 +47,8 @@ class StateTableByKeyGroupReaders {
                        case 1:
                                return new 
StateTableByKeyGroupReaderV1<>(table);
                        case 2:
-                               return new 
StateTableByKeyGroupReaderV2<>(table);
+                       case 3:
+                               return new 
StateTableByKeyGroupReaderV2V3<>(table);
                        default:
                                throw new IllegalArgumentException("Unknown 
version: " + version);
                }
@@ -110,10 +111,10 @@ class StateTableByKeyGroupReaders {
                }
        }
 
-       private static final class StateTableByKeyGroupReaderV2<K, N, S>
+       private static final class StateTableByKeyGroupReaderV2V3<K, N, S>
                        extends AbstractStateTableByKeyGroupReader<K, N, S> {
 
-               StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) {
+               StateTableByKeyGroupReaderV2V3(StateTable<K, N, S> stateTable) {
                        super(stateTable);
                }
 
@@ -133,4 +134,4 @@ class StateTableByKeyGroupReaders {
                        }
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index e872526..4bdc5e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -127,4 +129,14 @@ public class IntListSerializer extends 
TypeSerializer<IntList> {
        public int hashCode() {
                return IntListSerializer.class.hashCode();
        }
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompatibilityResult<IntList> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               throw new UnsupportedOperationException();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index e4a9264..0ae5e71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -136,4 +138,14 @@ public class IntPairSerializer extends 
TypeSerializer<IntPair> {
                        return obj.getClass() == IntPairSerializerFactory.class;
                };
        }
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompatibilityResult<IntPair> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               throw new UnsupportedOperationException();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index b62b097..17ee5f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
@@ -104,4 +106,14 @@ public class StringPairSerializer extends 
TypeSerializer<StringPair> {
        public int hashCode() {
                return StringPairSerializer.class.hashCode();
        }
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompatibilityResult<StringPair> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               throw new UnsupportedOperationException();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index b4d6eb7..8c4e049 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.query.netty.UnknownKvStateID;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapValueState;
@@ -270,7 +270,7 @@ public class QueryableStateClientTest {
                                ValueStateDescriptor<Integer> descriptor =
                                                new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 
-                               RegisteredBackendStateMetaInfo<VoidNamespace, 
Integer> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>(
+                               
RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> 
registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
                                                descriptor.getType(),
                                                descriptor.getName(),
                                                
VoidNamespaceSerializer.INSTANCE,
@@ -279,7 +279,7 @@ public class QueryableStateClientTest {
                                // Register state
                                HeapValueState<Integer, VoidNamespace, Integer> 
kvState = new HeapValueState<>(
                                                descriptor,
-                                               new 
NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, 
registeredBackendStateMetaInfo),
+                                               new 
NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, 
registeredKeyedBackendStateMetaInfo),
                                                IntSerializer.INSTANCE,
                                                
VoidNamespaceSerializer.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index c04ed8c..50ca159 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -97,7 +97,7 @@ public class OperatorStateBackendTest {
                assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
 
                // make sure that type registrations are forwarded
-               TypeSerializer<?> serializer = ((PartitionableListState<?>) 
listState).getPartitionStateSerializer();
+               TypeSerializer<?> serializer = ((PartitionableListState<?>) 
listState).getStateMetaInfo().getPartitionStateSerializer();
                assertTrue(serializer instanceof KryoSerializer);
                assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getSerializer(registeredType)
                                instanceof 
com.esotericsoftware.kryo.serializers.JavaSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 0dbe2eb..02b4d62 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -29,10 +30,21 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, 
OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
 public class SerializationProxiesTest {
 
        @Test
@@ -42,14 +54,14 @@ public class SerializationProxiesTest {
                TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> 
stateMetaInfoList = new ArrayList<>();
+               List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoList = new ArrayList<>();
 
-               stateMetaInfoList.add(
-                               new 
KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "a", 
namespaceSerializer, stateSerializer));
-               stateMetaInfoList.add(
-                               new 
KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "b", 
namespaceSerializer, stateSerializer));
-               stateMetaInfoList.add(
-                               new 
KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "c", 
namespaceSerializer, stateSerializer));
+               stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, "a", namespaceSerializer, 
stateSerializer).snapshot());
+               stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, "b", namespaceSerializer, 
stateSerializer).snapshot());
+               stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, "c", namespaceSerializer, 
stateSerializer).snapshot());
 
                KeyedBackendSerializationProxy serializationProxy =
                                new 
KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
@@ -67,8 +79,8 @@ public class SerializationProxiesTest {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
-               Assert.assertEquals(keySerializer, 
serializationProxy.getKeySerializerProxy().getTypeSerializer());
-               Assert.assertEquals(stateMetaInfoList, 
serializationProxy.getNamedStateSerializationProxies());
+               Assert.assertEquals(keySerializer, 
serializationProxy.getKeySerializer());
+               Assert.assertEquals(stateMetaInfoList, 
serializationProxy.getStateMetaInfoSnapshots());
        }
 
        @Test
@@ -78,41 +90,79 @@ public class SerializationProxiesTest {
                TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfo =
-                               new 
KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, 
name, namespaceSerializer, stateSerializer);
+               RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = 
new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, name, namespaceSerializer, 
stateSerializer).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       metaInfo.write(new DataOutputViewStreamWrapper(out));
+                       KeyedBackendStateMetaInfoSnapshotReaderWriters
+                               
.getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
+                               .writeStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
+
                        serialized = out.toByteArray();
                }
 
-               metaInfo = new 
KeyedBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader());
-
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       metaInfo.read(new DataInputViewStreamWrapper(in));
+                       metaInfo = 
KeyedBackendStateMetaInfoSnapshotReaderWriters
+                               
.getReaderForVersion(KeyedBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
+                               .readStateMetaInfo(new 
DataInputViewStreamWrapper(in));
                }
 
-               Assert.assertEquals(name, metaInfo.getStateName());
+               Assert.assertEquals(name, metaInfo.getName());
        }
 
+       @Test
+       public void testKeyedStateMetaInfoReadSerializerFailureResilience() 
throws Exception {
+               String name = "test";
+               TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+               TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+               RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = 
new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, name, namespaceSerializer, 
stateSerializer).snapshot();
+
+               byte[] serialized;
+               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
+                       KeyedBackendStateMetaInfoSnapshotReaderWriters
+                               
.getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
+                               .writeStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
+
+                       serialized = out.toByteArray();
+               }
+
+               // mock failure when deserializing serializer
+               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       metaInfo = 
KeyedBackendStateMetaInfoSnapshotReaderWriters
+                               
.getReaderForVersion(KeyedBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
+                               .readStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+               }
+
+               Assert.assertEquals(name, metaInfo.getName());
+               Assert.assertEquals(null, metaInfo.getNamespaceSerializer());
+               Assert.assertEquals(null, metaInfo.getStateSerializer());
+               
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
metaInfo.getNamespaceSerializerConfigSnapshot());
+               Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
metaInfo.getStateSerializerConfigSnapshot());
+       }
 
        @Test
        public void testOperatorBackendSerializationProxyRoundtrip() throws 
Exception {
 
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               List<OperatorBackendSerializationProxy.StateMetaInfo<?>> 
stateMetaInfoList = new ArrayList<>();
+               List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
stateMetaInfoSnapshots = new ArrayList<>();
 
-               stateMetaInfoList.add(
-                               new 
OperatorBackendSerializationProxy.StateMetaInfo<>("a", stateSerializer, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-               stateMetaInfoList.add(
-                               new 
OperatorBackendSerializationProxy.StateMetaInfo<>("b", stateSerializer, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-               stateMetaInfoList.add(
-                               new 
OperatorBackendSerializationProxy.StateMetaInfo<>("c", stateSerializer, 
OperatorStateHandle.Mode.BROADCAST));
+               stateMetaInfoSnapshots.add(new 
RegisteredOperatorBackendStateMetaInfo<>(
+                       "a", stateSerializer, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
+               stateMetaInfoSnapshots.add(new 
RegisteredOperatorBackendStateMetaInfo<>(
+                       "b", stateSerializer, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
+               stateMetaInfoSnapshots.add(new 
RegisteredOperatorBackendStateMetaInfo<>(
+                       "c", stateSerializer, 
OperatorStateHandle.Mode.BROADCAST).snapshot());
 
                OperatorBackendSerializationProxy serializationProxy =
-                               new 
OperatorBackendSerializationProxy(stateMetaInfoList);
+                               new 
OperatorBackendSerializationProxy(stateMetaInfoSnapshots);
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
@@ -127,7 +177,7 @@ public class SerializationProxiesTest {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
-               Assert.assertEquals(stateMetaInfoList, 
serializationProxy.getNamedStateSerializationProxies());
+               Assert.assertEquals(stateMetaInfoSnapshots, 
serializationProxy.getStateMetaInfoSnapshots());
        }
 
        @Test
@@ -136,22 +186,60 @@ public class SerializationProxiesTest {
                String name = "test";
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
-                               new 
OperatorBackendSerializationProxy.StateMetaInfo<>(name, stateSerializer, 
OperatorStateHandle.Mode.BROADCAST);
+               RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+                       new RegisteredOperatorBackendStateMetaInfo<>(
+                               name, stateSerializer, 
OperatorStateHandle.Mode.BROADCAST).snapshot();
+
+               byte[] serialized;
+               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
+                       OperatorBackendStateMetaInfoSnapshotReaderWriters
+                               
.getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo)
+                               .writeStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
+
+                       serialized = out.toByteArray();
+               }
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
+                               
.getReaderForVersion(OperatorBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
+                               .readStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+               }
+
+               Assert.assertEquals(name, metaInfo.getName());
+       }
+
+       @Test
+       public void testOperatorStateMetaInfoReadSerializerFailureResilience() 
throws Exception {
+               String name = "test";
+               TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+               RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+                       new RegisteredOperatorBackendStateMetaInfo<>(
+                               name, stateSerializer, 
OperatorStateHandle.Mode.BROADCAST).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       metaInfo.write(new DataOutputViewStreamWrapper(out));
+                       OperatorBackendStateMetaInfoSnapshotReaderWriters
+                               
.getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo)
+                               .writeStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
+
                        serialized = out.toByteArray();
                }
 
-               metaInfo = new 
OperatorBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader());
+               // mock failure when deserializing serializer
+               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       metaInfo.read(new DataInputViewStreamWrapper(in));
+                       metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
+                               
.getReaderForVersion(OperatorBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
+                               .readStateMetaInfo(new 
DataInputViewStreamWrapper(in));
                }
 
                Assert.assertEquals(name, metaInfo.getName());
+               Assert.assertEquals(null, 
metaInfo.getPartitionStateSerializer());
+               Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
metaInfo.getPartitionStateSerializerConfigSnapshot());
        }
 
        /**
@@ -171,4 +259,4 @@ public class SerializationProxiesTest {
                Assert.assertEquals(5, 
StateDescriptor.Type.AGGREGATING.ordinal());
                Assert.assertEquals(6, StateDescriptor.Type.MAP.ordinal());
        }
-}
\ No newline at end of file
+}

Reply via email to