http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 4712ed1..5459d53 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -18,29 +18,43 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * A test for the {@link PojoSerializer}.
  */
@@ -191,6 +205,20 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                        return true;
                }
        }
+
+       public static class SubTestUserClassA extends TestUserClass {
+               public int subDumm1;
+               public String subDumm2;
+
+               public SubTestUserClassA() {}
+       }
+
+       public static class SubTestUserClassB extends TestUserClass {
+               public Double subDumm1;
+               public float subDumm2;
+
+               public SubTestUserClassB() {}
+       }
        
        /**
         * This tests if the hashes returned by the pojo and tuple comparators 
are the same
@@ -240,4 +268,244 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                Assert.assertTrue("The hashing for tuples and pojos must be the 
same, so that they are mixable. Also for those with multiple key fields", 
multiPojoHash == multiTupleHash);
                
        }
-}      
+
+       // 
--------------------------------------------------------------------------------------------
+       // Configuration snapshotting & reconfiguring tests
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Verifies that reconfiguring with a config snapshot of a preceding 
POJO serializer
+        * with different POJO type will result in INCOMPATIBLE.
+        */
+       @Test
+       public void testReconfigureWithDifferentPojoType() throws Exception {
+               PojoSerializer<SubTestUserClassB> pojoSerializer1 = 
(PojoSerializer<SubTestUserClassB>)
+                       
TypeExtractor.getForClass(SubTestUserClassB.class).createSerializer(new 
ExecutionConfig());
+
+               // snapshot configuration and serialize to bytes
+               TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer1.snapshotConfiguration();
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       serializedConfig = out.toByteArray();
+               }
+
+               PojoSerializer<SubTestUserClassA> pojoSerializer2 = 
(PojoSerializer<SubTestUserClassA>)
+                       
TypeExtractor.getForClass(SubTestUserClassA.class).createSerializer(new 
ExecutionConfig());
+
+               // read configuration again from bytes
+               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               CompatibilityResult<SubTestUserClassA> compatResult = 
pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
+               assertTrue(compatResult.requiresMigration());
+       }
+
+       /**
+        * Tests that reconfiguration correctly reorders subclass registrations 
to their previous order.
+        */
+       @Test
+       public void testReconfigureDifferentSubclassRegistrationOrder() throws 
Exception {
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.registerPojoType(SubTestUserClassA.class);
+               executionConfig.registerPojoType(SubTestUserClassB.class);
+
+               PojoSerializer<TestUserClass> pojoSerializer = 
(PojoSerializer<TestUserClass>) type.createSerializer(executionConfig);
+
+               // get original registration ids
+               int subClassATag = 
pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class);
+               int subClassBTag = 
pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class);
+
+               // snapshot configuration and serialize to bytes
+               TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer.snapshotConfiguration();
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       serializedConfig = out.toByteArray();
+               }
+
+               // use new config and instantiate new PojoSerializer
+               executionConfig = new ExecutionConfig();
+               executionConfig.registerPojoType(SubTestUserClassB.class); // 
test with B registered before A
+               executionConfig.registerPojoType(SubTestUserClassA.class);
+
+               pojoSerializer = (PojoSerializer<TestUserClass>) 
type.createSerializer(executionConfig);
+
+               // read configuration from bytes
+               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+               assertTrue(!compatResult.requiresMigration());
+
+               // reconfigure - check reconfiguration result and that 
registration ids remains the same
+               //assertEquals(ReconfigureResult.COMPATIBLE, 
pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
+               assertEquals(subClassATag, 
pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue());
+               assertEquals(subClassBTag, 
pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue());
+       }
+
+       /**
+        * Tests that reconfiguration repopulates previously cached subclass 
serializers.
+        */
+       @Test
+       public void 
testReconfigureRepopulateNonregisteredSubclassSerializerCache() throws 
Exception {
+               // don't register any subclasses
+               PojoSerializer<TestUserClass> pojoSerializer = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+               // create cached serializers for SubTestUserClassA and 
SubTestUserClassB
+               pojoSerializer.getSubclassSerializer(SubTestUserClassA.class);
+               pojoSerializer.getSubclassSerializer(SubTestUserClassB.class);
+
+               assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+
+               // snapshot configuration and serialize to bytes
+               TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer.snapshotConfiguration();
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       serializedConfig = out.toByteArray();
+               }
+
+               // instantiate new PojoSerializer
+
+               pojoSerializer = (PojoSerializer<TestUserClass>) 
type.createSerializer(new ExecutionConfig());
+
+               // read configuration from bytes
+               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               // reconfigure - check reconfiguration result and that subclass 
serializer cache is repopulated
+               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+               assertFalse(compatResult.requiresMigration());
+               assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+       }
+
+       /**
+        * Tests that:
+        *  - Previous Pojo serializer did not have registrations, and created 
cached serializers for subclasses
+        *  - On restore, it had those subclasses registered
+        *
+        * In this case, after reconfiguration, the cache should be 
repopulated, and registrations should
+        * also exist for the subclasses.
+        *
+        * Note: the cache still needs to be repopulated because previous data 
of those subclasses were
+        * written with the cached serializers. In this case, the repopulated 
cache has reconfigured serializers
+        * for the subclasses so that previous written data can be read, but 
the registered serializers
+        * for the subclasses do not necessarily need to be reconfigured since 
they will only be used to
+        * write new data.
+        */
+       @Test
+       public void testReconfigureWithPreviouslyNonregisteredSubclasses() 
throws Exception {
+               // don't register any subclasses at first
+               PojoSerializer<TestUserClass> pojoSerializer = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+               // create cached serializers for SubTestUserClassA and 
SubTestUserClassB
+               pojoSerializer.getSubclassSerializer(SubTestUserClassA.class);
+               pojoSerializer.getSubclassSerializer(SubTestUserClassB.class);
+
+               // make sure serializers are in cache
+               assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+
+               // make sure that registrations are empty
+               assertTrue(pojoSerializer.getRegisteredClasses().isEmpty());
+               assertEquals(0, 
pojoSerializer.getRegisteredSerializers().length);
+
+               // snapshot configuration and serialize to bytes
+               TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer.snapshotConfiguration();
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       serializedConfig = out.toByteArray();
+               }
+
+               // instantiate new PojoSerializer, with new execution config 
that has the subclass registrations
+               ExecutionConfig newExecutionConfig = new ExecutionConfig();
+               newExecutionConfig.registerPojoType(SubTestUserClassA.class);
+               newExecutionConfig.registerPojoType(SubTestUserClassB.class);
+               pojoSerializer = (PojoSerializer<TestUserClass>) 
type.createSerializer(newExecutionConfig);
+
+               // read configuration from bytes
+               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               // reconfigure - check reconfiguration result and that
+               // 1) subclass serializer cache is repopulated
+               // 2) registrations also contain the now registered subclasses
+               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+               assertFalse(compatResult.requiresMigration());
+               assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+               
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+               assertEquals(2, pojoSerializer.getRegisteredClasses().size());
+               
assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class));
+               
assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class));
+       }
+
+       /**
+        * Verifies that reconfiguration reorders the fields of the new Pojo 
serializer to remain the same.
+        */
+       @Test
+       public void testReconfigureWithDifferentFieldOrder() throws Exception {
+               Field[] mockOriginalFieldOrder = {
+                       TestUserClass.class.getField("dumm4"),
+                       TestUserClass.class.getField("dumm3"),
+                       TestUserClass.class.getField("nestedClass"),
+                       TestUserClass.class.getField("dumm1"),
+                       TestUserClass.class.getField("dumm2"),
+                       TestUserClass.class.getField("dumm5"),
+               };
+
+               // creating this serializer just for generating config 
snapshots of the field serializers
+               PojoSerializer<TestUserClass> ser = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+               LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
mockOriginalFieldToSerializerConfigSnapshot =
+                       new LinkedHashMap<>(mockOriginalFieldOrder.length);
+               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], 
ser.getFieldSerializers()[3].snapshotConfiguration());
+               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], 
ser.getFieldSerializers()[2].snapshotConfiguration());
+               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], 
ser.getFieldSerializers()[5].snapshotConfiguration());
+               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], 
ser.getFieldSerializers()[0].snapshotConfiguration());
+               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], 
ser.getFieldSerializers()[1].snapshotConfiguration());
+               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], 
ser.getFieldSerializers()[4].snapshotConfiguration());
+
+               PojoSerializer<TestUserClass> pojoSerializer = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+               assertEquals(TestUserClass.class.getField("dumm1"), 
pojoSerializer.getFields()[0]);
+               assertEquals(TestUserClass.class.getField("dumm2"), 
pojoSerializer.getFields()[1]);
+               assertEquals(TestUserClass.class.getField("dumm3"), 
pojoSerializer.getFields()[2]);
+               assertEquals(TestUserClass.class.getField("dumm4"), 
pojoSerializer.getFields()[3]);
+               assertEquals(TestUserClass.class.getField("dumm5"), 
pojoSerializer.getFields()[4]);
+               assertEquals(TestUserClass.class.getField("nestedClass"), 
pojoSerializer.getFields()[5]);
+
+               PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> 
mockPreviousConfigSnapshot =
+                       new PojoSerializer.PojoSerializerConfigSnapshot<>(
+                               TestUserClass.class,
+                               mockOriginalFieldToSerializerConfigSnapshot, // 
this mocks the previous field order
+                               new LinkedHashMap<Class<?>, 
TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test
+                               new HashMap<Class<?>, 
TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test
+
+               // reconfigure - check reconfiguration result and that fields 
are reordered to the previous order
+               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(
+
+                       mockPreviousConfigSnapshot);
+               assertFalse(compatResult.requiresMigration());
+               int i = 0;
+               for (Field field : mockOriginalFieldOrder) {
+                       assertEquals(field, pojoSerializer.getFields()[i]);
+                       i++;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
new file mode 100644
index 0000000..60c4dc4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests related to configuration snapshotting and reconfiguring for the 
{@link KryoSerializer}.
+ */
+public class KryoSerializerCompatibilityTest {
+
+       /**
+        * Verifies that reconfiguration result is INCOMPATIBLE if data type 
has changed.
+        */
+       @Test
+       public void testMigrationStrategyWithDifferentKryoType() throws 
Exception {
+               KryoSerializer<TestClassA> kryoSerializerForA = new 
KryoSerializer<>(TestClassA.class, new ExecutionConfig());
+
+               // snapshot configuration and serialize to bytes
+               TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = 
kryoSerializerForA.snapshotConfiguration();
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+                       serializedConfig = out.toByteArray();
+               }
+
+               KryoSerializer<TestClassB> kryoSerializerForB = new 
KryoSerializer<>(TestClassB.class, new ExecutionConfig());
+
+               // read configuration again from bytes
+               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       kryoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               CompatibilityResult<TestClassB> compatResult = 
kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
+               assertTrue(compatResult.requiresMigration());
+       }
+
+       /**
+        * Tests that after reconfiguration, registration ids are reconfigured 
to
+        * remain the same as the preceding KryoSerializer.
+        */
+       @Test
+       public void testMigrationStrategyForDifferentRegistrationOrder() throws 
Exception {
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.registerKryoType(TestClassA.class);
+               executionConfig.registerKryoType(TestClassB.class);
+
+               KryoSerializer<TestClass> kryoSerializer = new 
KryoSerializer<>(TestClass.class, executionConfig);
+
+               // get original registration ids
+               int testClassId = 
kryoSerializer.getKryo().getRegistration(TestClass.class).getId();
+               int testClassAId = 
kryoSerializer.getKryo().getRegistration(TestClassA.class).getId();
+               int testClassBId = 
kryoSerializer.getKryo().getRegistration(TestClassB.class).getId();
+
+               // snapshot configuration and serialize to bytes
+               TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = 
kryoSerializer.snapshotConfiguration();
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+                       serializedConfig = out.toByteArray();
+               }
+
+               // use new config and instantiate new KryoSerializer
+               executionConfig = new ExecutionConfig();
+               executionConfig.registerKryoType(TestClassB.class); // test 
with B registered before A
+               executionConfig.registerKryoType(TestClassA.class);
+
+               kryoSerializer = new KryoSerializer<>(TestClass.class, 
executionConfig);
+
+               // read configuration from bytes
+               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       kryoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               // reconfigure - check reconfiguration result and that 
registration id remains the same
+               CompatibilityResult<TestClass> compatResult = 
kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
+               assertFalse(compatResult.requiresMigration());
+               assertEquals(testClassId, 
kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
+               assertEquals(testClassAId, 
kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
+               assertEquals(testClassBId, 
kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());
+       }
+
+       private static class TestClass {}
+
+       private static class TestClassA {}
+
+       private static class TestClassB {}
+
+       private static class TestClassBSerializer extends Serializer {
+               @Override
+               public void write(Kryo kryo, Output output, Object o) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Object read(Kryo kryo, Input input, Class aClass) {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 3bb40eb..4dabaca 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -26,7 +26,9 @@ import static org.junit.Assert.fail;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.memory.DataInputView;
@@ -386,5 +388,15 @@ public class CollectionInputFormatTest {
                public int hashCode() {
                        return Objects.hash(failOnRead, failOnWrite);
                }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public CompatibilityResult<ElementType> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       throw new UnsupportedOperationException();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index 846b6c3..9e22fc2 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.cep;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -39,7 +42,8 @@ import java.util.IdentityHashMap;
  *
  * @param <T> Type of the element to be serialized
  */
-public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
+@Internal
+public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
        private static final long serialVersionUID = -7633631762221447524L;
 
        // underlying type serializer
@@ -192,4 +196,14 @@ public class NonDuplicatingTypeSerializer<T> extends 
TypeSerializer<T> {
                this.identityMap = new IdentityHashMap<>();
                this.elementList = new ArrayList<>();
        }
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               throw new UnsupportedOperationException("This serializer is not 
registered for managed state.");
+       }
+
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               throw new UnsupportedOperationException("This serializer is not 
registered for managed state.");
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index b8c4e65..70755e5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,6 +21,22 @@ package org.apache.flink.cep.nfa;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -43,20 +59,6 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.cep.NonDuplicatingTypeSerializer;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
 
 /**
  * Non-deterministic finite automaton implementation.
@@ -859,7 +861,7 @@ public class NFA<T> implements Serializable {
        /**
         * A {@link TypeSerializer} for {@link NFA} that uses Java 
Serialization.
         */
-       public static class Serializer<T> extends TypeSerializer<NFA<T>> {
+       public static class Serializer<T> extends 
TypeSerializerSingleton<NFA<T>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -869,11 +871,6 @@ public class NFA<T> implements Serializable {
                }
 
                @Override
-               public TypeSerializer<NFA<T>> duplicate() {
-                       return this;
-               }
-
-               @Override
                public NFA<T> createInstance() {
                        return null;
                }
@@ -944,18 +941,8 @@ public class NFA<T> implements Serializable {
                }
 
                @Override
-               public boolean equals(Object obj) {
-                       return obj instanceof Serializer && ((Serializer) 
obj).canEqual(this);
-               }
-
-               @Override
                public boolean canEqual(Object obj) {
                        return obj instanceof Serializer;
                }
-
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b6374cd..14235dc 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -21,7 +21,10 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -473,11 +476,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                }
 
                @Override
-               public boolean canRestoreFrom(TypeSerializer<?> other) {
-                       return equals(other) || other instanceof 
AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
-               }
-
-               @Override
                public boolean equals(Object obj) {
                        if (obj instanceof PriorityQueueSerializer) {
                                @SuppressWarnings("unchecked")
@@ -498,6 +496,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                public int hashCode() {
                        return Objects.hash(factory, elementSerializer);
                }
+
+               // 
--------------------------------------------------------------------------------------------
+               // Serializer configuration snapshotting & compatibility
+               // 
--------------------------------------------------------------------------------------------
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       return new 
CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+               }
+
+               @Override
+               public CompatibilityResult<PriorityQueue<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
+                               CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
+                                               
((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+                               if (!compatResult.requiresMigration()) {
+                                       return CompatibilityResult.compatible();
+                               } else if 
(compatResult.getConvertDeserializer() != null) {
+                                       return 
CompatibilityResult.requiresMigration(
+                                               new 
PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory));
+                               }
+                       }
+
+                       return CompatibilityResult.requiresMigration(null);
+               }
        }
 
        private interface PriorityQueueFactory<T> extends Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
index b86fe87..5984122 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.types.valuearray;
 
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import 
org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -82,4 +83,10 @@ public final class IntValueArraySerializer extends 
TypeSerializerSingleton<IntVa
        public boolean canEqual(Object obj) {
                return obj instanceof IntValueArraySerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(IntPrimitiveArraySerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
index 95219b6..e95a1a7 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.types.valuearray;
 
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import 
org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -82,4 +83,10 @@ public final class LongValueArraySerializer extends 
TypeSerializerSingleton<Long
        public boolean canEqual(Object obj) {
                return obj instanceof LongValueArraySerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                               || 
identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
index 0e875e3..6dbe0e5 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.types.valuearray;
 
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -82,4 +83,10 @@ public final class StringValueArraySerializer extends 
TypeSerializerSingleton<St
        public boolean canEqual(Object obj) {
                return obj instanceof StringValueArraySerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(StringArraySerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 1f56a98..7ffa57c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.runtime.types
 
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 import org.apache.flink.types.Row
 
@@ -75,4 +75,55 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) 
extends TypeSeriali
   }
 
   override def hashCode: Int = rowSerializer.hashCode() * 13
+
+  // 
--------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // 
--------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+    new CRowSerializer.CRowSerializerConfigSnapshot(
+      rowSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[CRow] 
= {
+
+    configSnapshot match {
+      case crowSerializerConfigSnapshot: 
CRowSerializer.CRowSerializerConfigSnapshot =>
+        val compatResult = rowSerializer.ensureCompatibility(
+            
crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
+
+        if (compatResult.requiresMigration()) {
+          if (compatResult.getConvertDeserializer != null) {
+            CompatibilityResult.requiresMigration(
+              new CRowSerializer(compatResult.getConvertDeserializer)
+            )
+          } else {
+            CompatibilityResult.requiresMigration(null)
+          }
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object CRowSerializer {
+
+  class CRowSerializerConfigSnapshot(
+      private var rowSerializerConfigSnapshot: TypeSerializerConfigSnapshot)
+    extends CompositeTypeSerializerConfigSnapshot(rowSerializerConfigSnapshot) 
{
+
+    /** This empty nullary constructor is required for deserializing the 
configuration. */
+    def this() = this(null)
+
+    override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
+  }
+
+  object CRowSerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
index c6813b6..c4e23ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.migration;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -51,8 +54,7 @@ public class MigrationNamespaceSerializerProxy extends 
TypeSerializer<Serializab
 
        @Override
        public TypeSerializer<Serializable> duplicate() {
-               throw new UnsupportedOperationException(
-                               "This is just a proxy used during migration 
until the real type serializer is provided by the user.");
+               return this;
        }
 
        @Override
@@ -103,6 +105,17 @@ public class MigrationNamespaceSerializerProxy extends 
TypeSerializer<Serializab
        }
 
        @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               return new 
ParameterlessTypeSerializerConfig(getClass().getCanonicalName());
+       }
+
+       @Override
+       public CompatibilityResult<Serializable> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               // always assume compatibility since we're just a proxy for 
migration
+               return CompatibilityResult.compatible();
+       }
+
+       @Override
        public boolean equals(Object obj) {
                return obj instanceof MigrationNamespaceSerializerProxy;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
index f47989a..f58070e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
@@ -85,14 +85,14 @@ public abstract class AbstractMigrationRestoreStrategy<K, 
N, S> implements Migra
                        patchedNamespaceSerializer = (TypeSerializer<N>) 
VoidNamespaceSerializer.INSTANCE;
                }
 
-               RegisteredBackendStateMetaInfo<N, S> 
registeredBackendStateMetaInfo =
-                               new RegisteredBackendStateMetaInfo<>(
+               RegisteredKeyedBackendStateMetaInfo<N, S> 
registeredKeyedBackendStateMetaInfo =
+                               new RegisteredKeyedBackendStateMetaInfo<>(
                                                StateDescriptor.Type.UNKNOWN,
                                                stateName,
                                                patchedNamespaceSerializer,
                                                stateSerializer);
 
-               final StateTable<K, N, S> stateTable = 
stateBackend.newStateTable(registeredBackendStateMetaInfo);
+               final StateTable<K, N, S> stateTable = 
stateBackend.newStateTable(registeredKeyedBackendStateMetaInfo);
                final DataInputView inView = openDataInputView();
                final int keyGroup = keyGroupRange.getStartKeyGroup();
                final int numNamespaces = inView.readInt();

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 0badb41..8fbc227 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -132,4 +135,30 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
        public int hashCode() {
                return elementSerializer.hashCode();
        }
-}
\ No newline at end of file
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               return new 
CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<ArrayList<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
+                       CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
+                               ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+                       if (!compatResult.requiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else if (compatResult.getConvertDeserializer() != 
null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new 
ArrayListSerializer<>(compatResult.getConvertDeserializer()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index e7ed26f..ec4aa81 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -207,26 +207,18 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                        final Map<String, 
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
                                                new 
HashMap<>(registeredStatesDeepCopies.size());
 
-                                       
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+                                       
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> metaInfoSnapshots =
                                                new 
ArrayList<>(registeredStatesDeepCopies.size());
 
-                                       for (Map.Entry<String, 
PartitionableListState<?>> entry :
-                                               
registeredStatesDeepCopies.entrySet()) {
-
-                                               PartitionableListState<?> state 
= entry.getValue();
-                                               
OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
-                                                       new 
OperatorBackendSerializationProxy.StateMetaInfo<>(
-                                                               state.getName(),
-                                                               
state.getPartitionStateSerializer(),
-                                                               
state.getAssignmentMode());
-                                               metaInfoList.add(metaInfo);
+                                       for (Map.Entry<String, 
PartitionableListState<?>> entry : registeredStatesDeepCopies.entrySet()) {
+                                               
metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                                        }
 
                                        
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
                                        DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
 
                                        OperatorBackendSerializationProxy 
backendSerializationProxy =
-                                               new 
OperatorBackendSerializationProxy(metaInfoList);
+                                               new 
OperatorBackendSerializationProxy(metaInfoSnapshots);
 
                                        backendSerializationProxy.write(dov);
 
@@ -237,7 +229,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                                                PartitionableListState<?> value 
= entry.getValue();
                                                long[] partitionOffsets = 
value.write(out);
-                                               OperatorStateHandle.Mode mode = 
value.getAssignmentMode();
+                                               OperatorStateHandle.Mode mode = 
value.getStateMetaInfo().getAssignmentMode();
                                                writtenStatesMetaData.put(
                                                        entry.getKey(),
                                                        new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
@@ -254,10 +246,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                return null;
                                        }
 
-                                       OperatorStateHandle operatorStateHandle 
=
-                                               new 
OperatorStateHandle(writtenStatesMetaData, stateHandle);
-
-                                       return operatorStateHandle;
+                                       return new 
OperatorStateHandle(writtenStatesMetaData, stateHandle);
                                }
                        };
 
@@ -298,25 +287,23 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                                backendSerializationProxy.read(new 
DataInputViewStreamWrapper(in));
 
-                               
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
-                                               
backendSerializationProxy.getNamedStateSerializationProxies();
+                               
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
restoredMetaInfoSnapshots =
+                                               
backendSerializationProxy.getStateMetaInfoSnapshots();
 
                                // Recreate all PartitionableListStates from 
the meta info
-                               for 
(OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : 
metaInfoList) {
-                                       PartitionableListState<?> listState = 
registeredStates.get(stateMetaInfo.getName());
+                               for 
(RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : 
restoredMetaInfoSnapshots) {
+                                       PartitionableListState<?> listState = 
registeredStates.get(restoredMetaInfo.getName());
 
                                        if (null == listState) {
                                                listState = new 
PartitionableListState<>(
-                                                               
stateMetaInfo.getName(),
-                                                               
stateMetaInfo.getStateSerializer(),
-                                                               
stateMetaInfo.getMode());
+                                                               new 
RegisteredOperatorBackendStateMetaInfo<>(
+                                                                               
restoredMetaInfo.getName(),
+                                                                               
restoredMetaInfo.getPartitionStateSerializer(),
+                                                                               
restoredMetaInfo.getAssignmentMode()));
 
-                                               
registeredStates.put(listState.getName(), listState);
+                                               
registeredStates.put(listState.getStateMetaInfo().getName(), listState);
                                        } else {
-                                               
Preconditions.checkState(listState.getPartitionStateSerializer().canRestoreFrom(
-                                                               
stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
-                                                               
listState.getPartitionStateSerializer() + " is not compatible with " +
-                                                               
stateMetaInfo.getStateSerializer());
+                                               // TODO with eager state 
registration in place, check here for serializer migration strategies
                                        }
                                }
 
@@ -341,7 +328,6 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        }
 
        /**
-        *
         * Implementation of operator list state.
         *
         * @param <S> the type of an operator state partition.
@@ -349,19 +335,9 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        static final class PartitionableListState<S> implements ListState<S> {
 
                /**
-                * The name of the state, as registered by the user
-                */
-               private final String name;
-
-               /**
-                * The type serializer for the elements in the state list
-                */
-               private final TypeSerializer<S> partitionStateSerializer;
-
-               /**
-                * The mode how elements in this state are assigned to tasks 
during restore
+                * Meta information of the state, including state name, 
assignment mode, and serializer
                 */
-               private final OperatorStateHandle.Mode assignmentMode;
+               private final RegisteredOperatorBackendStateMetaInfo<S> 
stateMetaInfo;
 
                /**
                 * The internal list the holds the elements of the state
@@ -373,46 +349,26 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                 */
                private final ArrayListSerializer<S> internalListCopySerializer;
 
-               public PartitionableListState(
-                               String name,
-                               TypeSerializer<S> partitionStateSerializer,
-                               OperatorStateHandle.Mode assignmentMode) {
-
-                       this(name, partitionStateSerializer, assignmentMode, 
new ArrayList<S>());
+               public 
PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) 
{
+                       this(stateMetaInfo, new ArrayList<S>());
                }
 
                private PartitionableListState(
-                               String name,
-                               TypeSerializer<S> partitionStateSerializer,
-                               OperatorStateHandle.Mode assignmentMode,
+                               RegisteredOperatorBackendStateMetaInfo<S> 
stateMetaInfo,
                                ArrayList<S> internalList) {
 
-                       this.name = Preconditions.checkNotNull(name);
-                       this.partitionStateSerializer = 
Preconditions.checkNotNull(partitionStateSerializer);
-                       this.assignmentMode = 
Preconditions.checkNotNull(assignmentMode);
+                       this.stateMetaInfo = 
Preconditions.checkNotNull(stateMetaInfo);
                        this.internalList = 
Preconditions.checkNotNull(internalList);
-                       this.internalListCopySerializer = new 
ArrayListSerializer<>(partitionStateSerializer);
+                       this.internalListCopySerializer = new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
                }
 
                private PartitionableListState(PartitionableListState<S> 
toCopy) {
 
-                       this(
-                                       toCopy.name,
-                                       
toCopy.partitionStateSerializer.duplicate(),
-                                       toCopy.assignmentMode,
-                                       
toCopy.internalListCopySerializer.copy(toCopy.internalList));
-               }
-
-               public String getName() {
-                       return name;
-               }
-
-               public OperatorStateHandle.Mode getAssignmentMode() {
-                       return assignmentMode;
+                       this(toCopy.stateMetaInfo, 
toCopy.internalListCopySerializer.copy(toCopy.internalList));
                }
 
-               public TypeSerializer<S> getPartitionStateSerializer() {
-                       return partitionStateSerializer;
+               public RegisteredOperatorBackendStateMetaInfo<S> 
getStateMetaInfo() {
+                       return stateMetaInfo;
                }
 
                public List<S> getInternalList() {
@@ -441,8 +397,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                @Override
                public String toString() {
                        return "PartitionableListState{" +
-                                       "name='" + name + '\'' +
-                                       ", assignmentMode=" + assignmentMode +
+                                       "stateMetaInfo=" + stateMetaInfo +
                                        ", internalList=" + internalList +
                                        '}';
                }
@@ -456,7 +411,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        for (int i = 0; i < internalList.size(); ++i) {
                                S element = internalList.get(i);
                                partitionOffsets[i] = out.getPos();
-                               partitionStateSerializer.serialize(element, 
dov);
+                               
getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
                        }
 
                        return partitionOffsets;
@@ -466,7 +421,6 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        private <S> ListState<S> getListState(
                ListStateDescriptor<S> stateDescriptor,
                OperatorStateHandle.Mode mode) throws IOException {
-
                Preconditions.checkNotNull(stateDescriptor);
 
                
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
@@ -478,23 +432,27 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredStates.get(name);
 
                if (null == partitionableListState) {
-
                        partitionableListState = new PartitionableListState<>(
-                               name,
-                               partitionStateSerializer,
-                               mode);
+                               new RegisteredOperatorBackendStateMetaInfo<>(
+                                       name,
+                                       partitionStateSerializer,
+                                       mode));
 
                        registeredStates.put(name, partitionableListState);
                } else {
+                       // TODO with eager registration in place, these checks 
should be moved to restore()
+
                        Preconditions.checkState(
-                               
partitionableListState.getAssignmentMode().equals(mode),
-                               "Incompatible assignment mode. Provided: " + 
mode + ", expected: " +
-                                       
partitionableListState.getAssignmentMode());
+                               
partitionableListState.getStateMetaInfo().getName().equals(name),
+                               "Incompatible state names. " +
+                                       "Was [" + 
partitionableListState.getStateMetaInfo().getName() + "], " +
+                                       "registered with [" + name + "].");
+
                        Preconditions.checkState(
-                               stateDescriptor.getElementSerializer().
-                                       
canRestoreFrom(partitionableListState.getPartitionStateSerializer()),
-                               "Incompatible type serializers. Provided: " + 
stateDescriptor.getElementSerializer() +
-                                       ", found: " + 
partitionableListState.getPartitionStateSerializer());
+                               
partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode),
+                               "Incompatible state assignment modes. " +
+                                       "Was [" + 
partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " +
+                                       "registered with [" + mode + "].");
                }
 
                return partitionableListState;
@@ -509,7 +467,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        long[] offsets = metaInfo.getOffsets();
                        if (null != offsets) {
                                DataInputView div = new 
DataInputViewStreamWrapper(in);
-                               TypeSerializer<S> serializer = 
stateListForName.getPartitionStateSerializer();
+                               TypeSerializer<S> serializer = 
stateListForName.getStateMetaInfo().getPartitionStateSerializer();
                                for (long offset : offsets) {
                                        in.seek(offset);
                                        
stateListForName.add(serializer.deserialize(div));

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index 61cc58c..d52c207 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -38,7 +42,8 @@ import java.util.Map;
  * @param <K> The type of the keys in the map.
  * @param <V> The type of the values in the map.
  */
-public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
+@Internal
+public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, 
V>> {
 
        private static final long serialVersionUID = -6885593032367050078L;
        
@@ -190,4 +195,37 @@ public class HashMapSerializer<K, V> extends 
TypeSerializer<HashMap<K, V>> {
        public int hashCode() {
                return keySerializer.hashCode() * 31 + 
valueSerializer.hashCode();
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               return new MapSerializerConfigSnapshot(
+                               keySerializer.snapshotConfiguration(),
+                               valueSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<HashMap<K, V>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof MapSerializerConfigSnapshot) {
+                       TypeSerializerConfigSnapshot[] 
keyValueSerializerConfigSnapshots =
+                               ((MapSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+
+                       CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
+                       CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
+
+                       if (!keyCompatResult.requiresMigration() && 
!valueCompatResult.requiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else if (keyCompatResult.getConvertDeserializer() != 
null && valueCompatResult.getConvertDeserializer() != null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new HashMapSerializer<>(
+                                               
keyCompatResult.getConvertDeserializer(),
+                                               
valueCompatResult.getConvertDeserializer()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 512baf6..d49b1d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -31,7 +31,7 @@ import java.io.Serializable;
 
 @SuppressWarnings("serial")
 @Internal
-final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+final class JavaSerializer<T extends Serializable> extends 
TypeSerializerSingleton<T> {
 
        private static final long serialVersionUID = 5067491650263321234L;
 
@@ -41,11 +41,6 @@ final class JavaSerializer<T extends Serializable> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public TypeSerializer<T> duplicate() {
-               return this;
-       }
-
-       @Override
        public T createInstance() {
                return null;
        }
@@ -98,17 +93,7 @@ final class JavaSerializer<T extends Serializable> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public boolean equals(Object obj) {
-               return obj instanceof JavaSerializer;
-       }
-
-       @Override
        public boolean canEqual(Object obj) {
                return obj instanceof JavaSerializer;
        }
-
-       @Override
-       public int hashCode() {
-               return getClass().hashCode();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 5661c38..a389c4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -33,15 +31,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Serialization proxy for all meta data in keyed state backends. In the 
future we might also migrate the actual state
+ * Serialization proxy for all meta data in keyed state backends. In the 
future we might also requiresMigration the actual state
  * serialization logic here.
  */
 public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable {
 
-       public static final int VERSION = 2;
+       public static final int VERSION = 3;
 
-       private TypeSerializerSerializationProxy<?> keySerializerProxy;
-       private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+       private TypeSerializer<?> keySerializer;
+       private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoSnapshots;
 
        private int restoredVersion;
        private ClassLoader userCodeClassLoader;
@@ -50,19 +48,25 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
        }
 
-       public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, 
List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
-               this.keySerializerProxy = new 
TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
-               this.namedStateSerializationProxies = 
Preconditions.checkNotNull(namedStateSerializationProxies);
+       public KeyedBackendSerializationProxy(
+                       TypeSerializer<?> keySerializer,
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
+
+               this.keySerializer = Preconditions.checkNotNull(keySerializer);
+
+               Preconditions.checkNotNull(stateMetaInfoSnapshots);
+               Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE);
+               this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+
                this.restoredVersion = VERSION;
-               
Preconditions.checkArgument(namedStateSerializationProxies.size() <= 
Short.MAX_VALUE);
        }
 
-       public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() {
-               return namedStateSerializationProxies;
+       public List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
getStateMetaInfoSnapshots() {
+               return stateMetaInfoSnapshots;
        }
 
-       public TypeSerializerSerializationProxy<?> getKeySerializerProxy() {
-               return keySerializerProxy;
+       public TypeSerializer<?> getKeySerializer() {
+               return keySerializer;
        }
 
        @Override
@@ -82,20 +86,22 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
 
        @Override
        public boolean isCompatibleVersion(int version) {
-               // we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
-               return super.isCompatibleVersion(version) || version == 1;
+               // we are compatible with version 3 (Flink 1.3.x) and version 1 
& 2 (Flink 1.2.x)
+               return super.isCompatibleVersion(version) || version == 2 || 
version == 1;
        }
 
        @Override
        public void write(DataOutputView out) throws IOException {
                super.write(out);
 
-               keySerializerProxy.write(out);
+               new 
TypeSerializerSerializationProxy<>(keySerializer).write(out);
 
-               out.writeShort(namedStateSerializationProxies.size());
+               out.writeShort(stateMetaInfoSnapshots.size());
 
-               for (StateMetaInfo<?, ?> kvState : 
namedStateSerializationProxies) {
-                       kvState.write(out);
+               for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
metaInfo : stateMetaInfoSnapshots) {
+                       KeyedBackendStateMetaInfoSnapshotReaderWriters
+                               .getWriterForVersion(VERSION, metaInfo)
+                               .writeStateMetaInfo(out);
                }
        }
 
@@ -103,132 +109,18 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
        public void read(DataInputView in) throws IOException {
                super.read(in);
 
-               keySerializerProxy = new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
+               final TypeSerializerSerializationProxy<?> keySerializerProxy =
+                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
                keySerializerProxy.read(in);
+               this.keySerializer = keySerializerProxy.getTypeSerializer();
 
                int numKvStates = in.readShort();
-               namedStateSerializationProxies = new ArrayList<>(numKvStates);
-               for (int i = 0; i < numKvStates; ++i) {
-                       StateMetaInfo<?, ?> stateSerializationProxy = new 
StateMetaInfo<>(userCodeClassLoader);
-                       stateSerializationProxy.read(in);
-                       
namedStateSerializationProxies.add(stateSerializationProxy);
-               }
-       }
-
-       
//----------------------------------------------------------------------------------------------------------------------
-
-       /**
-        * This is the serialization proxy for {@link 
RegisteredBackendStateMetaInfo} for a single registered state in a
-        * keyed backend.
-        */
-       public static class StateMetaInfo<N, S> implements IOReadableWritable {
-
-               private StateDescriptor.Type stateType;
-               private String stateName;
-               private TypeSerializerSerializationProxy<N> 
namespaceSerializerSerializationProxy;
-               private TypeSerializerSerializationProxy<S> 
stateSerializerSerializationProxy;
-
-               private ClassLoader userClassLoader;
-
-               StateMetaInfo(ClassLoader userClassLoader) {
-                       this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
-               }
-
-               public StateMetaInfo(
-                               StateDescriptor.Type stateType,
-                               String name,
-                               TypeSerializer<N> namespaceSerializer,
-                               TypeSerializer<S> stateSerializer) {
-
-                       this.stateType = Preconditions.checkNotNull(stateType);
-                       this.stateName = Preconditions.checkNotNull(name);
-                       this.namespaceSerializerSerializationProxy = new 
TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(namespaceSerializer));
-                       this.stateSerializerSerializationProxy = new 
TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(stateSerializer));
-               }
-
-               public StateDescriptor.Type getStateType() {
-                       return stateType;
-               }
-
-               public void setStateType(StateDescriptor.Type stateType) {
-                       this.stateType = stateType;
-               }
-
-               public String getStateName() {
-                       return stateName;
-               }
-
-               public void setStateName(String stateName) {
-                       this.stateName = stateName;
-               }
-
-               public TypeSerializerSerializationProxy<N> 
getNamespaceSerializerSerializationProxy() {
-                       return namespaceSerializerSerializationProxy;
-               }
-
-               public void 
setNamespaceSerializerSerializationProxy(TypeSerializerSerializationProxy<N> 
namespaceSerializerSerializationProxy) {
-                       this.namespaceSerializerSerializationProxy = 
namespaceSerializerSerializationProxy;
-               }
-
-               public TypeSerializerSerializationProxy<S> 
getStateSerializerSerializationProxy() {
-                       return stateSerializerSerializationProxy;
-               }
-
-               public void 
setStateSerializerSerializationProxy(TypeSerializerSerializationProxy<S> 
stateSerializerSerializationProxy) {
-                       this.stateSerializerSerializationProxy = 
stateSerializerSerializationProxy;
-               }
-
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       out.writeInt(getStateType().ordinal());
-                       out.writeUTF(getStateName());
-
-                       getNamespaceSerializerSerializationProxy().write(out);
-                       getStateSerializerSerializationProxy().write(out);
-               }
-
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       int enumOrdinal = in.readInt();
-                       
setStateType(StateDescriptor.Type.values()[enumOrdinal]);
-                       setStateName(in.readUTF());
-
-                       namespaceSerializerSerializationProxy = new 
TypeSerializerSerializationProxy<>(userClassLoader);
-                       namespaceSerializerSerializationProxy.read(in);
-
-                       stateSerializerSerializationProxy = new 
TypeSerializerSerializationProxy<>(userClassLoader);
-                       stateSerializerSerializationProxy.read(in);
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-
-                       StateMetaInfo<?, ?> that = (StateMetaInfo<?, ?>) o;
-
-                       if (!getStateName().equals(that.getStateName())) {
-                               return false;
-                       }
-
-                       if 
(!getNamespaceSerializerSerializationProxy().equals(that.getNamespaceSerializerSerializationProxy()))
 {
-                               return false;
-                       }
-
-                       return 
getStateSerializerSerializationProxy().equals(that.getStateSerializerSerializationProxy());
-               }
-
-               @Override
-               public int hashCode() {
-                       int result = getStateName().hashCode();
-                       result = 31 * result + 
getNamespaceSerializerSerializationProxy().hashCode();
-                       result = 31 * result + 
getStateSerializerSerializationProxy().hashCode();
-                       return result;
+               stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+               for (int i = 0; i < numKvStates; i++) {
+                       stateMetaInfoSnapshots.add(
+                               KeyedBackendStateMetaInfoSnapshotReaderWriters
+                                       .getReaderForVersion(restoredVersion, 
userCodeClassLoader)
+                                       .readStateMetaInfo(in));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
new file mode 100644
index 0000000..83aa335
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Readers and writers for different versions of the {@link 
RegisteredKeyedBackendStateMetaInfo.Snapshot}.
+ * Outdated formats are also kept here for documentation of history backlog.
+ */
+public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
+
+       // 
-------------------------------------------------------------------------------
+       //  Writers
+       //   - v1: Flink 1.2.x
+       //   - v2: Flink 1.3.x
+       // 
-------------------------------------------------------------------------------
+
+       public static <N, S> KeyedBackendStateMetaInfoWriter 
getWriterForVersion(
+               int version, RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
stateMetaInfo) {
+
+               switch (version) {
+                       case 1:
+                       case 2:
+                               return new 
KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo);
+
+                       // current version
+                       case KeyedBackendSerializationProxy.VERSION:
+                               return new 
KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo);
+
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                                       "Unrecognized keyed 
backend state meta info writer version: " + version);
+               }
+       }
+
+       public interface KeyedBackendStateMetaInfoWriter {
+               void writeStateMetaInfo(DataOutputView out) throws IOException;
+       }
+
+       static abstract class AbstractKeyedBackendStateMetaInfoWriter<N, S> 
implements KeyedBackendStateMetaInfoWriter {
+
+               protected final RegisteredKeyedBackendStateMetaInfo.Snapshot<N, 
S> stateMetaInfo;
+
+               public 
AbstractKeyedBackendStateMetaInfoWriter(RegisteredKeyedBackendStateMetaInfo.Snapshot<N,
 S> stateMetaInfo) {
+                       this.stateMetaInfo = 
Preconditions.checkNotNull(stateMetaInfo);
+               }
+
+       }
+
+       static class KeyedBackendStateMetaInfoWriterV1V2<N, S> extends 
AbstractKeyedBackendStateMetaInfoWriter<N, S> {
+
+               public 
KeyedBackendStateMetaInfoWriterV1V2(RegisteredKeyedBackendStateMetaInfo.Snapshot<N,
 S> stateMetaInfo) {
+                       super(stateMetaInfo);
+               }
+
+               @Override
+               public void writeStateMetaInfo(DataOutputView out) throws 
IOException {
+                       out.writeInt(stateMetaInfo.getStateType().ordinal());
+                       out.writeUTF(stateMetaInfo.getName());
+
+                       new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(out);
+                       new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(out);
+               }
+       }
+
+       static class KeyedBackendStateMetaInfoWriterV3<N, S> extends 
AbstractKeyedBackendStateMetaInfoWriter<N, S> {
+
+               public 
KeyedBackendStateMetaInfoWriterV3(RegisteredKeyedBackendStateMetaInfo.Snapshot<N,
 S> stateMetaInfo) {
+                       super(stateMetaInfo);
+               }
+
+               @Override
+               public void writeStateMetaInfo(DataOutputView out) throws 
IOException {
+                       out.writeInt(stateMetaInfo.getStateType().ordinal());
+                       out.writeUTF(stateMetaInfo.getName());
+
+                       // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
+                       try (
+                               ByteArrayOutputStreamWithPos outWithPos = new 
ByteArrayOutputStreamWithPos();
+                               DataOutputViewStreamWrapper outViewWrapper = 
new DataOutputViewStreamWrapper(outWithPos)) {
+
+                               new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(outViewWrapper);
+
+                               // write current offset, which represents the 
start offset of the state serializer
+                               out.writeInt(outWithPos.getPosition());
+                               new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(outViewWrapper);
+
+                               // write current offset, which represents the 
start of the configuration snapshots
+                               out.writeInt(outWithPos.getPosition());
+                               
TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, 
stateMetaInfo.getNamespaceSerializerConfigSnapshot());
+                               
TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, 
stateMetaInfo.getStateSerializerConfigSnapshot());
+
+                               // write total number of bytes and then flush
+                               out.writeInt(outWithPos.getPosition());
+                               out.write(outWithPos.getBuf(), 0, 
outWithPos.getPosition());
+                       }
+               }
+       }
+
+
+       // 
-------------------------------------------------------------------------------
+       //  Readers
+       //   - v1: Flink 1.2.x
+       //   - v2: Flink 1.3.x
+       // 
-------------------------------------------------------------------------------
+
+       public static KeyedBackendStateMetaInfoReader getReaderForVersion(
+                       int version, ClassLoader userCodeClassLoader) {
+
+               switch (version) {
+                       case 1:
+                       case 2:
+                               return new 
KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader);
+
+                       // current version
+                       case KeyedBackendSerializationProxy.VERSION:
+                               return new 
KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader);
+
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                                       "Unrecognized keyed 
backend state meta info reader version: " + version);
+               }
+       }
+
+       public interface KeyedBackendStateMetaInfoReader<N, S> {
+               RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
readStateMetaInfo(DataInputView in) throws IOException;
+       }
+
+       static abstract class AbstractKeyedBackendStateMetaInfoReader 
implements KeyedBackendStateMetaInfoReader {
+
+               protected final ClassLoader userCodeClassLoader;
+
+               public AbstractKeyedBackendStateMetaInfoReader(ClassLoader 
userCodeClassLoader) {
+                       this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+               }
+
+       }
+
+       static class KeyedBackendStateMetaInfoReaderV1V2<N, S> extends 
AbstractKeyedBackendStateMetaInfoReader {
+
+               public KeyedBackendStateMetaInfoReaderV1V2(ClassLoader 
userCodeClassLoader) {
+                       super(userCodeClassLoader);
+               }
+
+               @Override
+               public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
readStateMetaInfo(DataInputView in) throws IOException {
+                       RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
metaInfo =
+                               new 
RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
+
+                       
metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
+                       metaInfo.setName(in.readUTF());
+
+                       final TypeSerializerSerializationProxy<N> 
namespaceSerializerProxy =
+                               new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
+                       namespaceSerializerProxy.read(in);
+                       
metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
+
+                       final TypeSerializerSerializationProxy<S> 
stateSerializerProxy =
+                               new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
+                       stateSerializerProxy.read(in);
+                       
metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
+
+                       // older versions do not contain the configuration 
snapshot
+                       metaInfo.setNamespaceSerializerConfigSnapshot(null);
+                       metaInfo.setStateSerializerConfigSnapshot(null);
+
+                       return metaInfo;
+               }
+       }
+
+       static class KeyedBackendStateMetaInfoReaderV3<N, S> extends 
AbstractKeyedBackendStateMetaInfoReader {
+
+               public KeyedBackendStateMetaInfoReaderV3(ClassLoader 
userCodeClassLoader) {
+                       super(userCodeClassLoader);
+               }
+
+               @Override
+               public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
readStateMetaInfo(DataInputView in) throws IOException {
+                       RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
metaInfo =
+                               new 
RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
+
+                       
metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
+                       metaInfo.setName(in.readUTF());
+
+                       // read offsets
+                       int stateSerializerStartOffset = in.readInt();
+                       int configSnapshotsStartOffset = in.readInt();
+
+                       int totalBytes = in.readInt();
+
+                       byte[] buffer = new byte[totalBytes];
+                       in.readFully(buffer);
+
+                       ByteArrayInputStreamWithPos inWithPos = new 
ByteArrayInputStreamWithPos(buffer);
+                       DataInputViewStreamWrapper inViewWrapper = new 
DataInputViewStreamWrapper(inWithPos);
+
+                       try {
+                               final TypeSerializerSerializationProxy<N> 
namespaceSerializerProxy =
+                                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
+                               namespaceSerializerProxy.read(inViewWrapper);
+                               
metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
+                       } catch (IOException e) {
+                               metaInfo.setNamespaceSerializer(null);
+                       }
+
+                       // make sure we start from the state serializer bytes 
position
+                       inWithPos.setPosition(stateSerializerStartOffset);
+                       try {
+                               final TypeSerializerSerializationProxy<S> 
stateSerializerProxy =
+                                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
+                               stateSerializerProxy.read(inViewWrapper);
+                               
metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
+                       } catch (IOException e) {
+                               metaInfo.setStateSerializer(null);
+                       }
+
+                       // make sure we start from the config snapshot bytes 
position
+                       inWithPos.setPosition(configSnapshotsStartOffset);
+                       metaInfo.setNamespaceSerializerConfigSnapshot(
+                               
TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, 
userCodeClassLoader));
+                       metaInfo.setStateSerializerConfigSnapshot(
+                               
TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, 
userCodeClassLoader));
+
+                       return metaInfo;
+               }
+       }
+}

Reply via email to