http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 1a9c8f9..08da49e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -23,22 +23,25 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -53,22 +56,55 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
        private static final long serialVersionUID = 1L;
 
-       private final Class<T> clazz;
+       // 
--------------------------------------------------------------------------------------------
+       // PojoSerializer parameters
+       // 
--------------------------------------------------------------------------------------------
 
-       private final TypeSerializer<Object>[] fieldSerializers;
+       /** The POJO type class. */
+       private final Class<T> clazz;
 
+       /**
+        * Fields of the POJO and their serializers.
+        *
+        * <p>The fields are kept as a separate transient member, with their 
serialization
+        * handled with the {@link #readObject(ObjectInputStream)} and {@link 
#writeObject(ObjectOutputStream)}
+        * methods.
+        *
+        * <p>These may be reconfigured in {@link 
#ensureCompatibility(TypeSerializerConfigSnapshot)}.
+        */
+       private transient Field[] fields;
+       private TypeSerializer<Object>[] fieldSerializers;
        private final int numFields;
 
-       private final Map<Class<?>, Integer> registeredClasses;
-
-       private final TypeSerializer<?>[] registeredSerializers;
-
+       /**
+        * Registered subclasses and their serializers.
+        * Each subclass to their registered class tag is maintained as a 
separate map ordered by the class tag.
+        *
+        * <p>These may be reconfigured in {@link 
#ensureCompatibility(TypeSerializerConfigSnapshot)}.
+        */
+       private LinkedHashMap<Class<?>, Integer> registeredClasses;
+       private TypeSerializer<?>[] registeredSerializers;
+
+       /**
+        * Cache of non-registered subclasses to their serializers, created 
on-the-fly.
+        *
+        * <p>This cache is persisted and will be repopulated with reconfigured 
serializers
+        * in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+        */
+       private transient HashMap<Class<?>, TypeSerializer<?>> 
subclassSerializerCache;
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Configuration of the current execution.
+        *
+        * <p>Nested serializers created using this will have the most 
up-to-date configuration,
+        * and can be resolved for backwards compatibility with previous 
configuration
+        * snapshots in {@link 
#ensureCompatibility(TypeSerializerConfigSnapshot)}.
+        */
        private final ExecutionConfig executionConfig;
 
-       private transient Map<Class<?>, TypeSerializer<?>> 
subclassSerializerCache;
        private transient ClassLoader cl;
-       // We need to handle these ourselves in writeObject()/readObject()
-       private transient Field[] fields;
 
        @SuppressWarnings("unchecked")
        public PojoSerializer(
@@ -83,93 +119,20 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                this.numFields = fieldSerializers.length;
                this.executionConfig = checkNotNull(executionConfig);
 
-               LinkedHashSet<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
-
                for (int i = 0; i < numFields; i++) {
                        this.fields[i].setAccessible(true);
                }
 
                cl = Thread.currentThread().getContextClassLoader();
 
-               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer<?>>();
-
                // We only want those classes that are not our own class and 
are actually sub-classes.
-               List<Class<?>> cleanedTaggedClasses = new 
ArrayList<Class<?>>(registeredPojoTypes.size());
-               for (Class<?> registeredClass: registeredPojoTypes) {
-                       if (registeredClass.equals(clazz)) {
-                               continue;
-                       }
-                       if (!clazz.isAssignableFrom(registeredClass)) {
-                               continue;
-                       }
-                       cleanedTaggedClasses.add(registeredClass);
-
-               }
-               this.registeredClasses = new LinkedHashMap<Class<?>, 
Integer>(cleanedTaggedClasses.size());
-               registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+               LinkedHashSet<Class<?>> registeredSubclasses =
+                               
getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig);
 
-               int id = 0;
-               for (Class<?> registeredClass: cleanedTaggedClasses) {
-                       this.registeredClasses.put(registeredClass, id);
-                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
-                       registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+               this.registeredClasses = 
createRegisteredSubclassTags(registeredSubclasses);
+               this.registeredSerializers = 
createRegisteredSubclassSerializers(registeredSubclasses, executionConfig);
 
-                       id++;
-               }
-       }
-
-       private void writeObject(ObjectOutputStream out)
-                       throws IOException, ClassNotFoundException {
-               out.defaultWriteObject();
-               out.writeInt(fields.length);
-               for (Field field: fields) {
-                       FieldSerializer.serializeField(field, out);
-               }
-       }
-
-       private void readObject(ObjectInputStream in)
-                       throws IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               int numFields = in.readInt();
-               fields = new Field[numFields];
-               for (int i = 0; i < numFields; i++) {
-                       fields[i] = FieldSerializer.deserializeField(in);
-               }
-
-               cl = Thread.currentThread().getContextClassLoader();
-               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer<?>>();
-       }
-
-       private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
-               TypeSerializer<?> result = 
subclassSerializerCache.get(subclass);
-               if (result == null) {
-
-                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(subclass);
-                       result = typeInfo.createSerializer(executionConfig);
-                       if (result instanceof PojoSerializer) {
-                               PojoSerializer<?> subclassSerializer = 
(PojoSerializer<?>) result;
-                               subclassSerializer.copyBaseFieldOrder(this);
-                       }
-                       subclassSerializerCache.put(subclass, result);
-
-               }
-               return result;
-       }
-
-       @SuppressWarnings("unused")
-       private boolean hasField(Field f) {
-               for (Field field: fields) {
-                       if (f.equals(field)) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-
-       private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
-               // do nothing for now, but in the future, adapt subclass 
serializer to have same
-               // ordering as base class serializer so that binary comparison 
on base class fields
-               // can work
+               this.subclassSerializerCache = new HashMap<>();
        }
        
        @Override
@@ -296,7 +259,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        }
                                }
                        } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields before.", e);
                        }
                } else {
                        TypeSerializer subclassSerializer = 
getSubclassSerializer(actualType);
@@ -341,13 +304,15 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                target.writeByte(flags);
 
+               // if its a registered subclass, write the class tag id, 
otherwise write the full classname
                if ((flags & IS_SUBCLASS) != 0) {
                        target.writeUTF(actualClass.getName());
                } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
                        target.writeByte(subclassTag);
                }
 
-
+               // if its a subclass, use the corresponding subclass serializer,
+               // otherwise serialize each field with our field serializers
                if ((flags & NO_SUBCLASS) != 0) {
                        try {
                                for (int i = 0; i < numFields; i++) {
@@ -360,8 +325,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        }
                                }
                        } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
-
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields before.", e);
                        }
                } else {
                        // subclass
@@ -418,8 +382,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        }
                                }
                        } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
-
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields before.", e);
                        }
                } else {
                        if (subclassSerializer != null) {
@@ -493,8 +456,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        }
                                }
                        } catch (IllegalAccessException e) {
-                               throw new RuntimeException(
-                                               "Error during POJO copy, this 
should not happen since we check the fields before.");
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields before.", e);
                        }
                } else {
                        if (subclassSerializer != null) {
@@ -574,4 +536,527 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
        public boolean canEqual(Object obj) {
                return obj instanceof PojoSerializer;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public PojoSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return buildConfigSnapshot(
+                               clazz,
+                               registeredClasses,
+                               registeredSerializers,
+                               fields,
+                               fieldSerializers,
+                               subclassSerializerCache);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
+                       final PojoSerializerConfigSnapshot<T> config = 
(PojoSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (clazz.equals(config.getTypeClass())) {
+                               if (this.numFields == 
config.getFieldToSerializerConfigSnapshot().size()) {
+
+                                       CompatibilityResult<?> compatResult;
+
+                                       // ----------- check field order and 
compatibility of field serializers -----------
+
+                                       // reordered fields and their 
serializers;
+                                       // this won't be applied to this 
serializer until all compatibility checks have been completed
+                                       final Field[] reorderedFields = new 
Field[this.numFields];
+                                       final TypeSerializer<Object>[] 
reorderedFieldSerializers =
+                                               (TypeSerializer<Object>[]) new 
TypeSerializer<?>[this.numFields];
+
+                                       int i = 0;
+                                       for (Map.Entry<Field, 
TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry
+                                                       : 
config.getFieldToSerializerConfigSnapshot().entrySet()) {
+
+                                               int fieldIndex = 
findField(fieldToConfigSnapshotEntry.getKey());
+                                               if (fieldIndex != -1) {
+                                                       reorderedFields[i] = 
fieldToConfigSnapshotEntry.getKey();
+
+                                                       compatResult = 
fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
+                                                       if 
(compatResult.requiresMigration()) {
+                                                               return 
CompatibilityResult.requiresMigration(null);
+                                                       } else {
+                                                               
reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
+                                                       }
+                                               } else {
+                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               }
+
+                                               i++;
+                                       }
+
+                                       // ---- check subclass registration 
order and compatibility of registered serializers ----
+
+                                       // reordered subclass registrations and 
their serializers;
+                                       // this won't be applied to this 
serializer until all compatibility checks have been completed
+                                       final LinkedHashMap<Class<?>, Integer> 
reorderedRegisteredSubclassesToClasstags;
+                                       final TypeSerializer<?>[] 
reorderedRegisteredSubclassSerializers;
+
+                                       final LinkedHashMap<Class<?>, 
TypeSerializerConfigSnapshot> previousRegistrations =
+                                               
config.getRegisteredSubclassesToSerializerConfigSnapshots();
+
+                                       // the reconfigured list of registered 
subclasses will be the previous registered
+                                       // subclasses in the original order 
with new subclasses appended at the end
+                                       LinkedHashSet<Class<?>> 
reorderedRegisteredSubclasses = new LinkedHashSet<>();
+                                       
reorderedRegisteredSubclasses.addAll(previousRegistrations.keySet());
+                                       reorderedRegisteredSubclasses.addAll(
+                                               
getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig));
+
+                                       // re-establish the registered class 
tags and serializers
+                                       
reorderedRegisteredSubclassesToClasstags = 
createRegisteredSubclassTags(reorderedRegisteredSubclasses);
+                                       reorderedRegisteredSubclassSerializers 
= createRegisteredSubclassSerializers(
+                                               reorderedRegisteredSubclasses, 
executionConfig);
+
+                                       i = 0;
+                                       for (TypeSerializerConfigSnapshot 
previousRegisteredSerializerConfig : previousRegistrations.values()) {
+                                               // check compatibility of 
subclass serializer
+                                               compatResult = 
reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
+                                               if 
(compatResult.requiresMigration()) {
+                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               }
+
+                                               i++;
+                                       }
+
+                                       // ------------------ ensure 
compatibility of non-registered subclass serializers ------------------
+
+                                       // the rebuilt cache for non-registered 
subclass serializers;
+                                       // this won't be applied to this 
serializer until all compatibility checks have been completed
+                                       HashMap<Class<?>, TypeSerializer<?>> 
rebuiltCache = new HashMap<>();
+
+                                       for (Map.Entry<Class<?>, 
TypeSerializerConfigSnapshot> previousCachedEntry
+                                                       : 
config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
+
+                                               TypeSerializer<?> 
cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey());
+
+                                               // check compatibility of 
cached subclass serializer
+                                               compatResult = 
cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
+                                               if 
(compatResult.requiresMigration()) {
+                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               } else {
+                                                       
rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
+                                               }
+                                       }
+
+                                       // completed compatibility checks; up 
to this point, we can just reconfigure
+                                       // the serializer so that it is 
compatible and migration is not required
+
+                                       this.fields = reorderedFields;
+                                       this.fieldSerializers = 
reorderedFieldSerializers;
+
+                                       this.registeredClasses = 
reorderedRegisteredSubclassesToClasstags;
+                                       this.registeredSerializers = 
reorderedRegisteredSubclassSerializers;
+
+                                       this.subclassSerializerCache = 
rebuiltCache;
+
+                                       return CompatibilityResult.compatible();
+                               }
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       public static final class PojoSerializerConfigSnapshot<T> extends 
GenericTypeSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               /**
+                * Ordered map of POJO fields to the configuration snapshots of 
their corresponding serializers.
+                *
+                * <p>Ordering of the fields is kept so that new Pojo 
serializers for previous data
+                * may reorder the fields in case they are different. The order 
of the fields need to
+                * stay the same for binary compatibility, as the field order 
is part of the serialization format.
+                */
+               private LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
fieldToSerializerConfigSnapshot;
+
+               /**
+                * Ordered map of registered subclasses to the configuration 
snapshots of their corresponding serializers.
+                *
+                * <p>Ordering of the registered subclasses is kept so that new 
Pojo serializers for previous data
+                * may retain the same class tag used for registered 
subclasses. Newly registered subclasses that
+                * weren't present before should be appended with the next 
available class tag.
+                */
+               private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> 
registeredSubclassesToSerializerConfigSnapshots;
+
+               /**
+                * Configuration snapshots of previously cached non-registered 
subclass serializers.
+                *
+                * <p>This is kept so that new Pojo serializers may eagerly 
repopulate their
+                * cache with reconfigured subclass serializers.
+                */
+               private HashMap<Class<?>, TypeSerializerConfigSnapshot> 
nonRegisteredSubclassesToSerializerConfigSnapshots;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public PojoSerializerConfigSnapshot() {}
+
+               public PojoSerializerConfigSnapshot(
+                               Class<T> pojoType,
+                               LinkedHashMap<Field, 
TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot,
+                               LinkedHashMap<Class<?>, 
TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots,
+                               HashMap<Class<?>, TypeSerializerConfigSnapshot> 
nonRegisteredSubclassesToSerializerConfigSnapshots) {
+
+                       super(pojoType);
+
+                       this.fieldToSerializerConfigSnapshot =
+                                       
Preconditions.checkNotNull(fieldToSerializerConfigSnapshot);
+                       this.registeredSubclassesToSerializerConfigSnapshots =
+                                       
Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots);
+                       this.nonRegisteredSubclassesToSerializerConfigSnapshots 
=
+                                       
Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots);
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       // --- write fields and their serializers, in order
+
+                       out.writeInt(fieldToSerializerConfigSnapshot.size());
+                       for (Map.Entry<Field, TypeSerializerConfigSnapshot> 
entry
+                               : fieldToSerializerConfigSnapshot.entrySet()) {
+                               out.writeUTF(entry.getKey().getName());
+                               
TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+                       }
+
+                       // --- write registered subclasses and their 
serializers, in registration order
+
+                       
out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
+                       for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> 
entry
+                                       : 
registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
+                               out.writeUTF(entry.getKey().getName());
+                               
TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+                       }
+
+                       // --- write snapshot of non-registered subclass 
serializer cache
+
+                       
out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
+                       for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> 
entry
+                                       : 
nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
+                               out.writeUTF(entry.getKey().getName());
+                               
TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+                       }
+               }
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       // --- read fields and their serializers, in order
+
+                       int numFields = in.readInt();
+                       this.fieldToSerializerConfigSnapshot = new 
LinkedHashMap<>(numFields);
+                       String fieldName;
+                       Field field;
+                       for (int i = 0; i < numFields; i++) {
+                               fieldName = in.readUTF();
+
+                               // search all superclasses for the field
+                               Class<?> clazz = getTypeClass();
+                               field = null;
+                               while (clazz != null) {
+                                       try {
+                                               field = 
clazz.getDeclaredField(fieldName);
+                                               field.setAccessible(true);
+                                               break;
+                                       } catch (NoSuchFieldException e) {
+                                               clazz = clazz.getSuperclass();
+                                       }
+                               }
+
+                               if (field == null) {
+                                       // the field no longer exists in the 
POJO
+                                       throw new IOException("Can't find field 
" + fieldName + " in POJO class " + getTypeClass().getName());
+                               } else {
+                                       fieldToSerializerConfigSnapshot.put(
+                                                       field,
+                                                       
TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+                               }
+                       }
+
+                       // --- read registered subclasses and their 
serializers, in registration order
+
+                       int numRegisteredSubclasses = in.readInt();
+                       this.registeredSubclassesToSerializerConfigSnapshots = 
new LinkedHashMap<>(numRegisteredSubclasses);
+                       String registeredSubclassname;
+                       Class<?> registeredSubclass;
+                       for (int i = 0; i < numRegisteredSubclasses; i++) {
+                               registeredSubclassname = in.readUTF();
+                               try {
+                                       registeredSubclass = 
Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
+                               } catch (ClassNotFoundException e) {
+                                       throw new IOException("Cannot find 
requested class " + registeredSubclassname + " in classpath.", e);
+                               }
+
+                               
this.registeredSubclassesToSerializerConfigSnapshots.put(
+                                               registeredSubclass,
+                                               
TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+                       }
+
+                       // --- read snapshot of non-registered subclass 
serializer cache
+
+                       int numCachedSubclassSerializers = in.readInt();
+                       this.nonRegisteredSubclassesToSerializerConfigSnapshots 
= new HashMap<>(numCachedSubclassSerializers);
+                       String cachedSubclassname;
+                       Class<?> cachedSubclass;
+                       for (int i = 0; i < numCachedSubclassSerializers; i++) {
+                               cachedSubclassname = in.readUTF();
+                               try {
+                                       cachedSubclass = 
Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
+                               } catch (ClassNotFoundException e) {
+                                       throw new IOException("Cannot find 
requested class " + cachedSubclassname + " in classpath.", e);
+                               }
+
+                               
this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+                                               cachedSubclass,
+                                               
TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+                       }
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               public LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
getFieldToSerializerConfigSnapshot() {
+                       return fieldToSerializerConfigSnapshot;
+               }
+
+               public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> 
getRegisteredSubclassesToSerializerConfigSnapshots() {
+                       return registeredSubclassesToSerializerConfigSnapshots;
+               }
+
+               public HashMap<Class<?>, TypeSerializerConfigSnapshot> 
getNonRegisteredSubclassesToSerializerConfigSnapshots() {
+                       return 
nonRegisteredSubclassesToSerializerConfigSnapshots;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return super.equals(obj)
+                                       && (obj instanceof 
PojoSerializerConfigSnapshot)
+                                       && 
fieldToSerializerConfigSnapshot.equals(((PojoSerializerConfigSnapshot) 
obj).getFieldToSerializerConfigSnapshot())
+                                       && 
registeredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot)
 obj).getRegisteredSubclassesToSerializerConfigSnapshots())
+                                       && 
nonRegisteredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot)
 obj).nonRegisteredSubclassesToSerializerConfigSnapshots);
+               }
+
+               @Override
+               public int hashCode() {
+                       return super.hashCode()
+                               + Objects.hash(
+                                       fieldToSerializerConfigSnapshot,
+                                       
registeredSubclassesToSerializerConfigSnapshots,
+                                       
nonRegisteredSubclassesToSerializerConfigSnapshots);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void writeObject(ObjectOutputStream out)
+               throws IOException, ClassNotFoundException {
+               out.defaultWriteObject();
+               out.writeInt(fields.length);
+               for (Field field: fields) {
+                       FieldSerializer.serializeField(field, out);
+               }
+       }
+
+       private void readObject(ObjectInputStream in)
+               throws IOException, ClassNotFoundException {
+               in.defaultReadObject();
+               int numFields = in.readInt();
+               fields = new Field[numFields];
+               for (int i = 0; i < numFields; i++) {
+                       fields[i] = FieldSerializer.deserializeField(in);
+               }
+
+               cl = Thread.currentThread().getContextClassLoader();
+               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer<?>>();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Extracts the subclasses of the base POJO class registered in the 
execution config.
+        */
+       private static LinkedHashSet<Class<?>> 
getRegisteredSubclassesFromExecutionConfig(
+                       Class<?> basePojoClass,
+                       ExecutionConfig executionConfig) {
+
+               LinkedHashSet<Class<?>> subclassesInRegistrationOrder = new 
LinkedHashSet<>(executionConfig.getRegisteredPojoTypes().size());
+               for (Class<?> registeredClass : 
executionConfig.getRegisteredPojoTypes()) {
+                       if (registeredClass.equals(basePojoClass)) {
+                               continue;
+                       }
+                       if (!basePojoClass.isAssignableFrom(registeredClass)) {
+                               continue;
+                       }
+                       subclassesInRegistrationOrder.add(registeredClass);
+               }
+
+               return subclassesInRegistrationOrder;
+       }
+
+       /**
+        * Builds map of registered subclasses to their class tags.
+        * Class tags will be integers starting from 0, assigned incrementally 
with the order of provided subclasses.
+        */
+       private static LinkedHashMap<Class<?>, Integer> 
createRegisteredSubclassTags(LinkedHashSet<Class<?>> registeredSubclasses) {
+               final LinkedHashMap<Class<?>, Integer> classToTag = new 
LinkedHashMap<>();
+
+               int id = 0;
+               for (Class<?> registeredClass : registeredSubclasses) {
+                       classToTag.put(registeredClass, id);
+                       id ++;
+               }
+
+               return classToTag;
+       }
+
+       /**
+        * Creates an array of serializers for provided list of registered 
subclasses.
+        * Order of returned serializers will correspond to order of provided 
subclasses.
+        */
+       private static TypeSerializer<?>[] createRegisteredSubclassSerializers(
+                       LinkedHashSet<Class<?>> registeredSubclasses,
+                       ExecutionConfig executionConfig) {
+
+               final TypeSerializer<?>[] subclassSerializers = new 
TypeSerializer[registeredSubclasses.size()];
+
+               int i = 0;
+               for (Class<?> registeredClass : registeredSubclasses) {
+                       subclassSerializers[i] = 
TypeExtractor.createTypeInfo(registeredClass).createSerializer(executionConfig);
+                       i++;
+               }
+
+               return subclassSerializers;
+       }
+
+       /**
+        * Fetches cached serializer for a non-registered subclass;
+        * also creates the serializer if it doesn't exist yet.
+        *
+        * This method is also exposed to package-private access
+        * for testing purposes.
+        */
+       TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
+               TypeSerializer<?> result = 
subclassSerializerCache.get(subclass);
+               if (result == null) {
+                       result = createSubclassSerializer(subclass);
+                       subclassSerializerCache.put(subclass, result);
+               }
+               return result;
+       }
+
+       private TypeSerializer<?> createSubclassSerializer(Class<?> subclass) {
+               TypeSerializer<?> serializer = 
TypeExtractor.createTypeInfo(subclass).createSerializer(executionConfig);
+
+               if (serializer instanceof PojoSerializer) {
+                       PojoSerializer<?> subclassSerializer = 
(PojoSerializer<?>) serializer;
+                       subclassSerializer.copyBaseFieldOrder(this);
+               }
+
+               return serializer;
+       }
+
+       /**
+        * Finds and returns the order (0-based) of a POJO field.
+        * Returns -1 if the field does not exist for this POJO.
+        */
+       private int findField(Field f) {
+               int foundIndex = 0;
+               for (Field field : fields) {
+                       if (f.equals(field)) {
+                               return foundIndex;
+                       }
+
+                       foundIndex++;
+               }
+
+               return -1;
+       }
+
+       private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
+               // do nothing for now, but in the future, adapt subclass 
serializer to have same
+               // ordering as base class serializer so that binary comparison 
on base class fields
+               // can work
+       }
+
+       /**
+        * Build and return a snapshot of the serializer's parameters and 
currently cached serializers.
+        */
+       private static <T> PojoSerializerConfigSnapshot<T> buildConfigSnapshot(
+                       Class<T> pojoType,
+                       LinkedHashMap<Class<?>, Integer> 
registeredSubclassesToTags,
+                       TypeSerializer<?>[] registeredSubclassSerializers,
+                       Field[] fields,
+                       TypeSerializer<?>[] fieldSerializers,
+                       HashMap<Class<?>, TypeSerializer<?>> 
nonRegisteredSubclassSerializerCache) {
+
+               final LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
fieldToSerializerConfigSnapshots =
+                       new LinkedHashMap<>(fields.length);
+
+               for (int i = 0; i < fields.length; i++) {
+                       fieldToSerializerConfigSnapshots.put(fields[i], 
fieldSerializers[i].snapshotConfiguration());
+               }
+
+               final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> 
registeredSubclassesToSerializerConfigSnapshots =
+                               new 
LinkedHashMap<>(registeredSubclassesToTags.size());
+
+               for (Map.Entry<Class<?>, Integer> entry : 
registeredSubclassesToTags.entrySet()) {
+                       registeredSubclassesToSerializerConfigSnapshots.put(
+                                       entry.getKey(),
+                                       
registeredSubclassSerializers[entry.getValue()].snapshotConfiguration());
+               }
+
+               final HashMap<Class<?>, TypeSerializerConfigSnapshot> 
nonRegisteredSubclassesToSerializerConfigSnapshots =
+                               new 
LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size());
+
+               for (Map.Entry<Class<?>, TypeSerializer<?>> entry : 
nonRegisteredSubclassSerializerCache.entrySet()) {
+                       
nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), 
entry.getValue().snapshotConfiguration());
+               }
+
+               return new PojoSerializerConfigSnapshot<>(
+                               pojoType,
+                               fieldToSerializerConfigSnapshots,
+                               registeredSubclassesToSerializerConfigSnapshots,
+                               
nonRegisteredSubclassesToSerializerConfigSnapshots);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Test utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       @VisibleForTesting
+       Field[] getFields() {
+               return fields;
+       }
+
+       @VisibleForTesting
+       TypeSerializer<?>[] getFieldSerializers() {
+               return fieldSerializers;
+       }
+
+       @VisibleForTesting
+       LinkedHashMap<Class<?>, Integer> getRegisteredClasses() {
+               return registeredClasses;
+       }
+
+       @VisibleForTesting
+       TypeSerializer<?>[] getRegisteredSerializers() {
+               return registeredSerializers;
+       }
+
+       @VisibleForTesting
+       HashMap<Class<?>, TypeSerializer<?>> getSubclassSerializerCache() {
+               return subclassSerializerCache;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index dbd5d3a..5770dac 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -18,12 +18,17 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.Arrays;
 
 import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
@@ -35,12 +40,15 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Serializer for {@link Row}.
  */
 @Internal
-public class RowSerializer extends TypeSerializer<Row> {
+public final class RowSerializer extends TypeSerializer<Row> {
 
        private static final long serialVersionUID = 1L;
-       private final boolean[] nullMask;
+
        private final TypeSerializer<Object>[] fieldSerializers;
 
+       private transient boolean[] nullMask;
+
+       @SuppressWarnings("unchecked")
        public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
                this.fieldSerializers = (TypeSerializer<Object>[]) 
checkNotNull(fieldSerializers);
                this.nullMask = new boolean[fieldSerializers.length];
@@ -231,4 +239,73 @@ public class RowSerializer extends TypeSerializer<Row> {
        public int hashCode() {
                return Arrays.hashCode(fieldSerializers);
        }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+               this.nullMask = new boolean[fieldSerializers.length];
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public RowSerializerConfigSnapshot snapshotConfiguration() {
+               return new 
RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+       }
+
+       @Override
+       public CompatibilityResult<Row> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof RowSerializerConfigSnapshot) {
+                       TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots =
+                               ((RowSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+
+                       if (fieldSerializerConfigSnapshots.length == 
fieldSerializers.length) {
+                               boolean requireMigration = false;
+                               TypeSerializer<?>[] convertDeserializers = new 
TypeSerializer<?>[fieldSerializers.length];
+
+                               CompatibilityResult<?> compatResult;
+                               for (int i = 0; i < fieldSerializers.length; 
i++) {
+                                       compatResult = 
fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+                                       if (compatResult.requiresMigration()) {
+                                               requireMigration = true;
+
+                                               if 
(compatResult.getConvertDeserializer() == null) {
+                                                       // one of the field 
serializers cannot provide a fallback deserializer
+                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               } else {
+                                                       convertDeserializers[i] 
= compatResult.getConvertDeserializer();
+                                               }
+                                       }
+                               }
+
+                               if (requireMigration) {
+                                       return 
CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers));
+                               } else {
+                                       return CompatibilityResult.compatible();
+                               }
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       public static final class RowSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public RowSerializerConfigSnapshot() {}
+
+               public 
RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots) {
+                       super(fieldSerializerConfigSnapshots);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index afc4aa2..68d5aa8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -19,7 +19,10 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -115,4 +118,43 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
        public boolean canEqual(Object obj) {
                return obj instanceof TupleSerializerBase;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TupleSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new TupleSerializerConfigSnapshot<>(
+                               tupleClass,
+                               
TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof TupleSerializerConfigSnapshot) {
+                       final TupleSerializerConfigSnapshot<T> config = 
(TupleSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (tupleClass.equals(config.getTupleClass())) {
+                               TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots =
+                                       ((TupleSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+
+                               if (fieldSerializerConfigSnapshots.length == 
fieldSerializers.length) {
+
+                                       CompatibilityResult compatResult;
+                                       for (int i = 0; i < 
fieldSerializers.length; i++) {
+                                               compatResult = 
fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+                                               if 
(compatResult.requiresMigration()) {
+                                                       return 
CompatibilityResult.requiresMigration(null);
+                                               }
+                                       }
+
+                                       return CompatibilityResult.compatible();
+                               }
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
new file mode 100644
index 0000000..6d2bb5f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Snapshot of a tuple serializer's configuration.
+ */
+@Internal
+public final class TupleSerializerConfigSnapshot<T> extends 
CompositeTypeSerializerConfigSnapshot {
+
+       private static final int VERSION = 1;
+
+       private Class<T> tupleClass;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public TupleSerializerConfigSnapshot() {}
+
+       public TupleSerializerConfigSnapshot(
+                       Class<T> tupleClass,
+                       TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots) {
+
+               super(fieldSerializerConfigSnapshots);
+
+               this.tupleClass = Preconditions.checkNotNull(tupleClass);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               super.write(out);
+
+               InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), tupleClass);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               super.read(in);
+
+               try {
+                       tupleClass = InstantiationUtil.deserializeObject(new 
DataInputViewStream(in), getUserCodeClassLoader());
+               } catch (ClassNotFoundException e) {
+                       throw new IOException("Could not find requested tuple 
class in classpath.", e);
+               }
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+
+       public Class<T> getTupleClass() {
+               return tupleClass;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return super.equals(obj)
+                       && (obj instanceof TupleSerializerConfigSnapshot)
+                       && (tupleClass.equals(((TupleSerializerConfigSnapshot) 
obj).getTupleClass()));
+       }
+
+       @Override
+       public int hashCode() {
+               return super.hashCode() * 31 + tupleClass.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 56e204c..10e2330 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -19,15 +19,20 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.util.Preconditions;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,12 +44,23 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type serialized.
  */
 @Internal
-public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
+public final class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 
        private static final long serialVersionUID = 1L;
        
        private final Class<T> type;
-       
+
+       /**
+        * Map of class tag (using classname as tag) to their Kryo registration.
+        *
+        * <p>This map serves as a preview of the final registration result of
+        * the Kryo instance, taking into account registration overwrites.
+        *
+        * <p>Currently, we only have one single registration for the value 
type.
+        * Nevertheless, we keep this information here for future compatibility.
+        */
+       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
        private transient Kryo kryo;
        
        private transient T copyInstance;
@@ -53,6 +69,7 @@ public class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
        
        public ValueSerializer(Class<T> type) {
                this.type = checkNotNull(type);
+               this.kryoRegistrations = asKryoRegistrations(type);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -126,7 +143,8 @@ public class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
                        kryo.setInstantiatorStrategy(instantiatorStrategy);
 
                        this.kryo.setAsmEnabled(true);
-                       this.kryo.register(type);
+
+                       KryoUtils.applyRegistrations(this.kryo, 
kryoRegistrations.values());
                }
        }
        
@@ -152,4 +170,66 @@ public class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
        public boolean canEqual(Object obj) {
                return obj instanceof ValueSerializer;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public ValueSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new ValueSerializerConfigSnapshot<>(type);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof ValueSerializerConfigSnapshot) {
+                       final ValueSerializerConfigSnapshot<T> config = 
(ValueSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (type.equals(config.getTypeClass())) {
+                               // currently, simply checking the type of the 
value class is sufficient;
+                               // in the future, if there are more Kryo 
registrations, we should try to resolve that
+                               return CompatibilityResult.compatible();
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       public static class ValueSerializerConfigSnapshot<T extends Value> 
extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public ValueSerializerConfigSnapshot() {}
+
+               public ValueSerializerConfigSnapshot(Class<T> valueTypeClass) {
+                       super(valueTypeClass, 
asKryoRegistrations(valueTypeClass));
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               // kryoRegistrations may be null if this value serializer is 
deserialized from an old version
+               if (kryoRegistrations == null) {
+                       this.kryoRegistrations = asKryoRegistrations(type);
+               }
+       }
+
+       private static LinkedHashMap<String, KryoRegistration> 
asKryoRegistrations(Class<?> type) {
+               Preconditions.checkNotNull(type);
+
+               LinkedHashMap<String, KryoRegistration> registration = new 
LinkedHashMap<>(1);
+               registration.put(type.getClass().getName(), new 
KryoRegistration(type));
+
+               return registration;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index cba0c84..a172b72 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -21,18 +21,22 @@ package org.apache.flink.api.java.typeutils.runtime.kryo;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
 import org.apache.avro.generic.GenericData;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
 import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
-import 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -45,6 +49,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
@@ -72,11 +77,16 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
        // 
------------------------------------------------------------------------
 
-       private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
-       private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
registeredTypesWithSerializerClasses;
        private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
        private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
defaultSerializerClasses;
-       private final LinkedHashSet<Class<?>> registeredTypes;
+
+       /**
+        * Map of class tag (using classname as tag) to their Kryo registration.
+        *
+        * <p>This map serves as a preview of the final registration result of
+        * the Kryo instance, taking into account registration overwrites.
+        */
+       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
 
        private final Class<T> type;
        
@@ -93,26 +103,35 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        private transient Output output;
 
        // 
------------------------------------------------------------------------
+       // legacy fields; these fields cannot yet be removed to retain 
backwards compatibility
+
+       private LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
+       private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
registeredTypesWithSerializerClasses;
+       private LinkedHashSet<Class<?>> registeredTypes;
+
+       // 
------------------------------------------------------------------------
 
        public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
                this.type = checkNotNull(type);
 
                this.defaultSerializers = 
executionConfig.getDefaultKryoSerializers();
                this.defaultSerializerClasses = 
executionConfig.getDefaultKryoSerializerClasses();
-               this.registeredTypesWithSerializers = 
executionConfig.getRegisteredTypesWithKryoSerializers();
-               this.registeredTypesWithSerializerClasses = 
executionConfig.getRegisteredTypesWithKryoSerializerClasses();
-               this.registeredTypes = executionConfig.getRegisteredKryoTypes();
+
+               this.kryoRegistrations = buildKryoRegistrations(
+                               this.type,
+                               executionConfig.getRegisteredKryoTypes(),
+                               
executionConfig.getRegisteredTypesWithKryoSerializerClasses(),
+                               
executionConfig.getRegisteredTypesWithKryoSerializers());
        }
 
        /**
         * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
         */
        protected KryoSerializer(KryoSerializer<T> toCopy) {
-               registeredTypesWithSerializers = 
toCopy.registeredTypesWithSerializers;
-               registeredTypesWithSerializerClasses = 
toCopy.registeredTypesWithSerializerClasses;
                defaultSerializers = toCopy.defaultSerializers;
                defaultSerializerClasses = toCopy.defaultSerializerClasses;
-               registeredTypes = toCopy.registeredTypes;
+
+               kryoRegistrations = toCopy.kryoRegistrations;
 
                type = toCopy.type;
                if(type == null){
@@ -272,11 +291,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                if (obj instanceof KryoSerializer) {
                        KryoSerializer<?> other = (KryoSerializer<?>) obj;
 
-                       // we cannot include the Serializers here because they 
don't implement the equals method
                        return other.canEqual(this) &&
                                type == other.type &&
-                               registeredTypes.equals(other.registeredTypes) &&
-                               
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
 &&
+                               
kryoRegistrations.equals(other.kryoRegistrations) &&
                                
defaultSerializerClasses.equals(other.defaultSerializerClasses);
                } else {
                        return false;
@@ -334,7 +351,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                        //       This is due to a know issue with Kryo's 
JavaSerializer. See FLINK-6025 for details.
                        kryo.addDefaultSerializer(Throwable.class, new 
JavaSerializer());
 
-                       // Add default serializers first, so that they type 
registrations without a serializer
+                       // Add default serializers first, so that the type 
registrations without a serializer
                        // are registered with a default serializer
                        for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> entry: 
defaultSerializers.entrySet()) {
                                kryo.addDefaultSerializer(entry.getKey(), 
entry.getValue().getSerializer());
@@ -344,59 +361,152 @@ public class KryoSerializer<T> extends TypeSerializer<T> 
{
                                kryo.addDefaultSerializer(entry.getKey(), 
entry.getValue());
                        }
 
-                       // register the type of our class
-                       kryo.register(type);
+                       KryoUtils.applyRegistrations(this.kryo, 
kryoRegistrations.values());
 
-                       // register given types. we do this first so that any 
registration of a
-                       // more specific serializer overrides this
-                       for (Class<?> type : registeredTypes) {
-                               kryo.register(type);
-                       }
+                       kryo.setRegistrationRequired(false);
+                       
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+               }
+       }
 
-                       // register given serializer classes
-                       for (Map.Entry<Class<?>, Class<? extends 
Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
-                               Class<?> typeClass = e.getKey();
-                               Class<? extends Serializer<?>> serializerClass 
= e.getValue();
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
 
-                               Serializer<?> serializer =
-                                               
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
-                               kryo.register(typeClass, serializer);
-                       }
+       @Override
+       public KryoSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new KryoSerializerConfigSnapshot<>(type, 
kryoRegistrations);
+       }
 
-                       // register given serializers
-                       for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> e : 
registeredTypesWithSerializers.entrySet()) {
-                               kryo.register(e.getKey(), 
e.getValue().getSerializer());
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof KryoSerializerConfigSnapshot) {
+                       final KryoSerializerConfigSnapshot<T> config = 
(KryoSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (type.equals(config.getTypeClass())) {
+                               LinkedHashMap<String, KryoRegistration> 
reconfiguredRegistrations = config.getKryoRegistrations();
+
+                               // reconfigure by assuring that classes which 
were previously registered are registered
+                               // again in the exact same order; new class 
registrations will be appended.
+                               // this also overwrites any dummy placeholders 
that the restored old configuration has
+                               
reconfiguredRegistrations.putAll(kryoRegistrations);
+
+                               // check if there is still any dummy 
placeholders even after reconfiguration;
+                               // if so, then this new Kryo serializer cannot 
read old data and is therefore incompatible
+                               for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : reconfiguredRegistrations.entrySet()) {
+                                       if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
+                                               LOG.warn("The Kryo registration 
for a previously registered class {} does not have a " +
+                                                       "proper serializer, 
because its previous serializer cannot be loaded or is no " +
+                                                       "longer valid but a new 
serializer is not available", reconfiguredRegistrationEntry.getKey());
+
+                                               return 
CompatibilityResult.requiresMigration(null);
+                                       }
+                               }
+
+                               // there's actually no way to tell if new Kryo 
serializers are compatible with
+                               // the previous ones they overwrite; we can 
only signal compatibly and hope for the best
+                               this.kryoRegistrations = 
reconfiguredRegistrations;
+                               return CompatibilityResult.compatible();
                        }
-                       // this is needed for Avro but can not be added on 
demand.
-                       kryo.register(GenericData.Array.class, new 
SpecificInstanceCollectionSerializerForArrayList());
+               }
 
-                       kryo.setRegistrationRequired(false);
-                       
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       public static final class KryoSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public KryoSerializerConfigSnapshot() {}
+
+               public KryoSerializerConfigSnapshot(
+                               Class<T> typeClass,
+                               LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
+
+                       super(typeClass, kryoRegistrations);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
                }
        }
 
        // 
--------------------------------------------------------------------------------------------
-       // For testing
+       // Utilities
        // 
--------------------------------------------------------------------------------------------
-       
-       public Kryo getKryo() {
-               checkKryoInitialized();
-               return this.kryo;
+
+       /**
+        * Utility method that takes lists of registered types and their 
serializers, and resolve
+        * them into a single list such that the result will resemble the final 
registration
+        * result in Kryo.
+        */
+       private static LinkedHashMap<String, KryoRegistration> 
buildKryoRegistrations(
+                       Class<?> serializedType,
+                       LinkedHashSet<Class<?>> registeredTypes,
+                       LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
registeredTypesWithSerializerClasses,
+                       LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers) {
+
+               final LinkedHashMap<String, KryoRegistration> kryoRegistrations 
= new LinkedHashMap<>();
+
+               kryoRegistrations.put(serializedType.getName(), new 
KryoRegistration(serializedType));
+
+               for (Class<?> registeredType : checkNotNull(registeredTypes)) {
+                       kryoRegistrations.put(registeredType.getName(), new 
KryoRegistration(registeredType));
+               }
+
+               for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> 
registeredTypeWithSerializerClassEntry :
+                               
checkNotNull(registeredTypesWithSerializerClasses).entrySet()) {
+
+                       kryoRegistrations.put(
+                                       
registeredTypeWithSerializerClassEntry.getKey().getName(),
+                                       new KryoRegistration(
+                                                       
registeredTypeWithSerializerClassEntry.getKey(),
+                                                       
registeredTypeWithSerializerClassEntry.getValue()));
+               }
+
+               for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> registeredTypeWithSerializerEntry :
+                               
checkNotNull(registeredTypesWithSerializers).entrySet()) {
+
+                       kryoRegistrations.put(
+                                       
registeredTypeWithSerializerEntry.getKey().getName(),
+                                       new KryoRegistration(
+                                                       
registeredTypeWithSerializerEntry.getKey(),
+                                                       
registeredTypeWithSerializerEntry.getValue()));
+               }
+
+               kryoRegistrations.put(
+                               GenericData.Array.class.getName(),
+                               new KryoRegistration(
+                                               GenericData.Array.class,
+                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+
+               return kryoRegistrations;
        }
 
-       @Override
-       public boolean canRestoreFrom(TypeSerializer<?> other) {
-               if (other instanceof KryoSerializer) {
-                       KryoSerializer<?> otherKryo = (KryoSerializer<?>) other;
+       // 
--------------------------------------------------------------------------------------------
 
-                       // we cannot include the Serializers here because they 
don't implement the equals method
-                       return other.canEqual(this) &&
-                                       type == otherKryo.type &&
-                                       
(registeredTypes.equals(otherKryo.registeredTypes) || 
otherKryo.registeredTypes.isEmpty()) &&
-                                       
(registeredTypesWithSerializerClasses.equals(otherKryo.registeredTypesWithSerializerClasses)
 || otherKryo.registeredTypesWithSerializerClasses.isEmpty()) &&
-                                       
(defaultSerializerClasses.equals(otherKryo.defaultSerializerClasses) || 
otherKryo.defaultSerializerClasses.isEmpty());
-               } else {
-                       return false;
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               // kryoRegistrations may be null if this Kryo serializer is 
deserialized from an old version
+               if (kryoRegistrations == null) {
+                       this.kryoRegistrations = buildKryoRegistrations(
+                                       type,
+                                       registeredTypes,
+                                       registeredTypesWithSerializerClasses,
+                                       registeredTypesWithSerializers);
                }
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // For testing
+       // 
--------------------------------------------------------------------------------------------
+
+       @VisibleForTesting
+       public Kryo getKryo() {
+               checkKryoInitialized();
+               return this.kryo;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 91c6145..a846703 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,6 +32,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 
@@ -85,6 +89,38 @@ public abstract class SerializerTestBase<T> extends 
TestLogger {
                        fail("Exception in test: " + e.getMessage());
                }
        }
+
+       @Test
+       public void testConfigSnapshotInstantiation() {
+               TypeSerializerConfigSnapshot configSnapshot = 
getSerializer().snapshotConfiguration();
+
+               InstantiationUtil.instantiate(configSnapshot.getClass());
+       }
+
+       @Test
+       public void testSnapshotConfigurationAndReconfigure() throws Exception {
+               final TypeSerializerConfigSnapshot configSnapshot = 
getSerializer().snapshotConfiguration();
+
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(
+                               new DataOutputViewStreamWrapper(out), 
configSnapshot);
+                       serializedConfig = out.toByteArray();
+               }
+
+               TypeSerializerConfigSnapshot restoredConfig;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       restoredConfig = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               CompatibilityResult strategy = 
getSerializer().ensureCompatibility(restoredConfig);
+               assertFalse(strategy.requiresMigration());
+
+               // also verify that the serializer's reconfigure implementation 
detects incompatibility
+               strategy = getSerializer().ensureCompatibility(new 
TestIncompatibleSerializerConfigSnapshot());
+               assertTrue(strategy.requiresMigration());
+       }
        
        @Test
        public void testGetLength() {
@@ -477,4 +513,21 @@ public abstract class SerializerTestBase<T> extends 
TestLogger {
                        }
                }
        }
+
+       public static final class TestIncompatibleSerializerConfigSnapshot 
extends TypeSerializerConfigSnapshot {
+               @Override
+               public int getVersion() {
+                       return 0;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj instanceof 
TestIncompatibleSerializerConfigSnapshot;
+               }
+
+               @Override
+               public int hashCode() {
+                       return getClass().hashCode();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
new file mode 100644
index 0000000..0783bb6
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests related to {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotTest {
+
+       /**
+        * Verifies that reading and writing configuration snapshots work 
correctly.
+        */
+       @Test
+       public void testSerializeConfigurationSnapshots() throws Exception {
+               TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, 
"foo");
+               TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, 
"bar");
+
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshots(
+                               new DataOutputViewStreamWrapper(out),
+                               configSnapshot1,
+                               configSnapshot2);
+
+                       serializedConfig = out.toByteArray();
+               }
+
+               TypeSerializerConfigSnapshot[] restoredConfigs;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       restoredConfigs = 
TypeSerializerUtil.readSerializerConfigSnapshots(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               assertEquals(2, restoredConfigs.length);
+               assertEquals(configSnapshot1, restoredConfigs[0]);
+               assertEquals(configSnapshot2, restoredConfigs[1]);
+       }
+
+       /**
+        * Verifies that deserializing config snapshots fail if the config 
class could not be found.
+        */
+       @Test
+       public void testFailsWhenConfigurationSnapshotClassNotFound() throws 
Exception {
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(
+                               new DataOutputViewStreamWrapper(out), new 
TestConfigSnapshot(123, "foobar"));
+                       serializedConfig = out.toByteArray();
+               }
+
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       // read using a dummy classloader
+                       TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), new 
URLClassLoader(new URL[0], null));
+                       fail("Expected a ClassNotFoundException wrapped in 
IOException");
+               } catch (IOException expected) {
+                       // test passes
+               }
+       }
+
+       public static class TestConfigSnapshot extends 
TypeSerializerConfigSnapshot {
+
+               static final int VERSION = 1;
+
+               private int val;
+               private String msg;
+
+               public TestConfigSnapshot() {}
+
+               public TestConfigSnapshot(int val, String msg) {
+                       this.val = val;
+                       this.msg = msg;
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+                       out.writeInt(val);
+                       out.writeUTF(msg);
+               }
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+                       val = in.readInt();
+                       msg = in.readUTF();
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == this) {
+                               return true;
+                       }
+
+                       if (obj == null) {
+                               return false;
+                       }
+
+                       if (obj instanceof TestConfigSnapshot) {
+                               return val == ((TestConfigSnapshot) obj).val && 
msg.equals(((TestConfigSnapshot) obj).msg);
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       return 31 * val + msg.hashCode();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 5e2e733..5c615de 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -18,11 +18,25 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class EnumSerializerTest extends TestLogger {
 
        @Test
@@ -41,6 +55,132 @@ public class EnumSerializerTest extends TestLogger {
                new EnumSerializer<>(EmptyEnum.class);
        }
 
+       @Test
+       public void testReconfiguration() {
+               // mock the previous ordering of enum constants to be BAR, 
PAULA, NATHANIEL
+               PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, 
PublicEnum.PAULA, PublicEnum.NATHANIEL};
+
+               // now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, 
PAULA will be the "new wrong order"
+               EnumSerializer<PublicEnum> serializer = new 
EnumSerializer<>(PublicEnum.class);
+
+               // verify that the serializer is first using the "wrong order" 
(i.e., the initial new configuration)
+               assertEquals(PublicEnum.FOO.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+               assertEquals(PublicEnum.BAR.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+               assertEquals(PublicEnum.PETER.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+               assertEquals(PublicEnum.NATHANIEL.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+               assertEquals(PublicEnum.EMMA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+               assertEquals(PublicEnum.PAULA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+
+               // reconfigure and verify compatibility
+               CompatibilityResult<PublicEnum> compatResult = 
serializer.ensureCompatibility(
+                       new 
EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, 
mockPreviousOrder));
+               assertFalse(compatResult.requiresMigration());
+
+               // after reconfiguration, the order should be first the 
original BAR, PAULA, NATHANIEL,
+               // followed by the "new enum constants" FOO, PETER, EMMA
+               PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, 
PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
+
+               int i = 0;
+               for (PublicEnum constant : expectedOrder) {
+                       assertEquals(i, 
serializer.getValueToOrdinal().get(constant).intValue());
+                       i++;
+               }
+
+               assertTrue(Arrays.equals(expectedOrder, 
serializer.getValues()));
+       }
+
+       @Test
+       public void testConfigurationSnapshotSerialization() throws Exception {
+               EnumSerializer<PublicEnum> serializer = new 
EnumSerializer<>(PublicEnum.class);
+
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       TypeSerializerUtil.writeSerializerConfigSnapshot(
+                               new DataOutputViewStreamWrapper(out), 
serializer.snapshotConfiguration());
+                       serializedConfig = out.toByteArray();
+               }
+
+               TypeSerializerConfigSnapshot restoredConfig;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       restoredConfig = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               CompatibilityResult<PublicEnum> compatResult = 
serializer.ensureCompatibility(restoredConfig);
+               assertFalse(compatResult.requiresMigration());
+
+               assertEquals(PublicEnum.FOO.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+               assertEquals(PublicEnum.BAR.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+               assertEquals(PublicEnum.PETER.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+               assertEquals(PublicEnum.NATHANIEL.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+               assertEquals(PublicEnum.EMMA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+               assertEquals(PublicEnum.PAULA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+               assertTrue(Arrays.equals(PublicEnum.values(), 
serializer.getValues()));
+       }
+
+       @Test
+       public void testSerializeEnumSerializer() throws Exception {
+               EnumSerializer<PublicEnum> serializer = new 
EnumSerializer<>(PublicEnum.class);
+
+               // verify original transient parameters
+               assertEquals(PublicEnum.FOO.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+               assertEquals(PublicEnum.BAR.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+               assertEquals(PublicEnum.PETER.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+               assertEquals(PublicEnum.NATHANIEL.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+               assertEquals(PublicEnum.EMMA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+               assertEquals(PublicEnum.PAULA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+               assertTrue(Arrays.equals(PublicEnum.values(), 
serializer.getValues()));
+
+               byte[] serializedSerializer = 
InstantiationUtil.serializeObject(serializer);
+
+               // deserialize and re-verify transient parameters
+               serializer = 
InstantiationUtil.deserializeObject(serializedSerializer, 
Thread.currentThread().getContextClassLoader());
+               assertEquals(PublicEnum.FOO.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+               assertEquals(PublicEnum.BAR.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+               assertEquals(PublicEnum.PETER.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+               assertEquals(PublicEnum.NATHANIEL.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+               assertEquals(PublicEnum.EMMA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+               assertEquals(PublicEnum.PAULA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+               assertTrue(Arrays.equals(PublicEnum.values(), 
serializer.getValues()));
+       }
+
+       @Test
+       public void testSerializeReconfiguredEnumSerializer() throws Exception {
+               // mock the previous ordering of enum constants to be BAR, 
PAULA, NATHANIEL
+               PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, 
PublicEnum.PAULA, PublicEnum.NATHANIEL};
+
+               // now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, 
PAULA will be the "new wrong order"
+               EnumSerializer<PublicEnum> serializer = new 
EnumSerializer<>(PublicEnum.class);
+
+               // verify that the serializer is first using the "wrong order" 
(i.e., the initial new configuration)
+               assertEquals(PublicEnum.FOO.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+               assertEquals(PublicEnum.BAR.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+               assertEquals(PublicEnum.PETER.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+               assertEquals(PublicEnum.NATHANIEL.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+               assertEquals(PublicEnum.EMMA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+               assertEquals(PublicEnum.PAULA.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+
+               // reconfigure and verify compatibility
+               CompatibilityResult<PublicEnum> compatResult = 
serializer.ensureCompatibility(
+                       new 
EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, 
mockPreviousOrder));
+               assertFalse(compatResult.requiresMigration());
+
+               // serialize and deserialize again the serializer
+               byte[] serializedSerializer = 
InstantiationUtil.serializeObject(serializer);
+               serializer = 
InstantiationUtil.deserializeObject(serializedSerializer, 
Thread.currentThread().getContextClassLoader());
+
+               // verify that after the serializer was read, the reconfigured 
constant ordering is untouched
+               PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, 
PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
+
+               int i = 0;
+               for (PublicEnum constant : expectedOrder) {
+                       assertEquals(i, 
serializer.getValueToOrdinal().get(constant).intValue());
+                       i++;
+               }
+
+               assertTrue(Arrays.equals(expectedOrder, 
serializer.getValues()));
+       }
+
        @SafeVarargs
        public final <T extends Enum<T>> void testEnumSerializer(T... data) {
                @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
index 9fda3d0..3301aa2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
@@ -20,8 +20,6 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 
 /**
  * A test for the {@link LongPrimitiveArraySerializer}.

Reply via email to