http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a8368c4..2311158 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -34,13 +34,20 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -134,6 +141,25 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                this.subclassSerializerCache = new HashMap<>();
        }
+
+       public PojoSerializer(
+                       Class<T> clazz,
+                       Field[] fields,
+                       TypeSerializer<Object>[] fieldSerializers,
+                       LinkedHashMap<Class<?>, Integer> registeredClasses,
+                       TypeSerializer<?>[] registeredSerializers,
+                       HashMap<Class<?>, TypeSerializer<?>> 
subclassSerializerCache) {
+
+               this.clazz = checkNotNull(clazz);
+               this.fields = checkNotNull(fields);
+               this.numFields = fields.length;
+               this.fieldSerializers = checkNotNull(fieldSerializers);
+               this.registeredClasses = checkNotNull(registeredClasses);
+               this.registeredSerializers = 
checkNotNull(registeredSerializers);
+               this.subclassSerializerCache = 
checkNotNull(subclassSerializerCache);
+
+               this.executionConfig = null;
+       }
        
        @Override
        public boolean isImmutableType() {
@@ -558,6 +584,8 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
                        final PojoSerializerConfigSnapshot<T> config = 
(PojoSerializerConfigSnapshot<T>) configSnapshot;
 
+                       boolean requiresMigration = false;
+
                        if (clazz.equals(config.getTypeClass())) {
                                if (this.numFields == 
config.getFieldToSerializerConfigSnapshot().size()) {
 
@@ -572,16 +600,27 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                                (TypeSerializer<Object>[]) new 
TypeSerializer<?>[this.numFields];
 
                                        int i = 0;
-                                       for (Map.Entry<Field, 
TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry
+                                       for (Map.Entry<Field, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
fieldToConfigSnapshotEntry
                                                        : 
config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
                                                int fieldIndex = 
findField(fieldToConfigSnapshotEntry.getKey());
                                                if (fieldIndex != -1) {
                                                        reorderedFields[i] = 
fieldToConfigSnapshotEntry.getKey();
 
-                                                       compatResult = 
fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
+                                                       compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                                                       
fieldToConfigSnapshotEntry.getValue().f0,
+                                                                       
UnloadableDummyTypeSerializer.class,
+                                                                       
fieldToConfigSnapshotEntry.getValue().f1,
+                                                                       
fieldSerializers[fieldIndex]);
+
                                                        if 
(compatResult.isRequiresMigration()) {
-                                                               return 
CompatibilityResult.requiresMigration();
+                                                               
requiresMigration = true;
+
+                                                               if 
(compatResult.getConvertDeserializer() != null) {
+                                                                       
reorderedFieldSerializers[i] = (TypeSerializer<Object>) 
compatResult.getConvertDeserializer();
+                                                               } else {
+                                                                       return 
CompatibilityResult.requiresMigration();
+                                                               }
                                                        } else {
                                                                
reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
                                                        }
@@ -599,7 +638,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        final LinkedHashMap<Class<?>, Integer> 
reorderedRegisteredSubclassesToClasstags;
                                        final TypeSerializer<?>[] 
reorderedRegisteredSubclassSerializers;
 
-                                       final LinkedHashMap<Class<?>, 
TypeSerializerConfigSnapshot> previousRegistrations =
+                                       final LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousRegistrations =
                                                
config.getRegisteredSubclassesToSerializerConfigSnapshots();
 
                                        // the reconfigured list of registered 
subclasses will be the previous registered
@@ -615,11 +654,20 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                                reorderedRegisteredSubclasses, 
executionConfig);
 
                                        i = 0;
-                                       for (TypeSerializerConfigSnapshot 
previousRegisteredSerializerConfig : previousRegistrations.values()) {
+                                       for (Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> previousRegisteredSerializerConfig : 
previousRegistrations.values()) {
                                                // check compatibility of 
subclass serializer
-                                               compatResult = 
reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
+                                               compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                                               
previousRegisteredSerializerConfig.f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               
previousRegisteredSerializerConfig.f1,
+                                                               
reorderedRegisteredSubclassSerializers[i]);
+
                                                if 
(compatResult.isRequiresMigration()) {
-                                                       return 
CompatibilityResult.requiresMigration();
+                                                       requiresMigration = 
true;
+
+                                                       if 
(compatResult.getConvertDeserializer() == null) {
+                                                               return 
CompatibilityResult.requiresMigration();
+                                                       }
                                                }
 
                                                i++;
@@ -631,15 +679,26 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        // this won't be applied to this 
serializer until all compatibility checks have been completed
                                        HashMap<Class<?>, TypeSerializer<?>> 
rebuiltCache = new HashMap<>();
 
-                                       for (Map.Entry<Class<?>, 
TypeSerializerConfigSnapshot> previousCachedEntry
+                                       for (Map.Entry<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousCachedEntry
                                                        : 
config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
 
                                                TypeSerializer<?> 
cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey());
 
                                                // check compatibility of 
cached subclass serializer
-                                               compatResult = 
cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
+                                               compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                                               
previousCachedEntry.getValue().f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               
previousCachedEntry.getValue().f1,
+                                                               
cachedSerializer);
+
                                                if 
(compatResult.isRequiresMigration()) {
-                                                       return 
CompatibilityResult.requiresMigration();
+                                                       requiresMigration = 
true;
+
+                                                       if 
(compatResult.getConvertDeserializer() != null) {
+                                                               
rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
+                                                       } else {
+                                                               return 
CompatibilityResult.requiresMigration();
+                                                       }
                                                } else {
                                                        
rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
                                                }
@@ -648,15 +707,26 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        // completed compatibility checks; up 
to this point, we can just reconfigure
                                        // the serializer so that it is 
compatible and migration is not required
 
-                                       this.fields = reorderedFields;
-                                       this.fieldSerializers = 
reorderedFieldSerializers;
+                                       if (!requiresMigration) {
+                                               this.fields = reorderedFields;
+                                               this.fieldSerializers = 
reorderedFieldSerializers;
 
-                                       this.registeredClasses = 
reorderedRegisteredSubclassesToClasstags;
-                                       this.registeredSerializers = 
reorderedRegisteredSubclassSerializers;
+                                               this.registeredClasses = 
reorderedRegisteredSubclassesToClasstags;
+                                               this.registeredSerializers = 
reorderedRegisteredSubclassSerializers;
 
-                                       this.subclassSerializerCache = 
rebuiltCache;
+                                               this.subclassSerializerCache = 
rebuiltCache;
 
-                                       return CompatibilityResult.compatible();
+                                               return 
CompatibilityResult.compatible();
+                                       } else {
+                                               return 
CompatibilityResult.requiresMigration(
+                                                       new PojoSerializer<>(
+                                                               clazz,
+                                                               reorderedFields,
+                                                               
reorderedFieldSerializers,
+                                                               
reorderedRegisteredSubclassesToClasstags,
+                                                               
reorderedRegisteredSubclassSerializers,
+                                                               rebuiltCache));
+                                       }
                                }
                        }
                }
@@ -669,39 +739,56 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                private static final int VERSION = 1;
 
                /**
-                * Ordered map of POJO fields to the configuration snapshots of 
their corresponding serializers.
+                * Ordered map of POJO fields to their corresponding 
serializers and its configuration snapshots.
                 *
                 * <p>Ordering of the fields is kept so that new Pojo 
serializers for previous data
                 * may reorder the fields in case they are different. The order 
of the fields need to
                 * stay the same for binary compatibility, as the field order 
is part of the serialization format.
                 */
-               private LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
fieldToSerializerConfigSnapshot;
+               private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
 
                /**
-                * Ordered map of registered subclasses to the configuration 
snapshots of their corresponding serializers.
+                * Ordered map of registered subclasses to their corresponding 
serializers and its configuration snapshots.
                 *
                 * <p>Ordering of the registered subclasses is kept so that new 
Pojo serializers for previous data
                 * may retain the same class tag used for registered 
subclasses. Newly registered subclasses that
                 * weren't present before should be appended with the next 
available class tag.
                 */
-               private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> 
registeredSubclassesToSerializerConfigSnapshots;
+               private LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots;
 
                /**
-                * Configuration snapshots of previously cached non-registered 
subclass serializers.
+                * Previously cached non-registered subclass serializers and 
its configuration snapshots.
                 *
                 * <p>This is kept so that new Pojo serializers may eagerly 
repopulate their
                 * cache with reconfigured subclass serializers.
                 */
-               private HashMap<Class<?>, TypeSerializerConfigSnapshot> 
nonRegisteredSubclassesToSerializerConfigSnapshots;
+               private HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots;
+
+               private boolean ignoreTypeSerializerSerialization;
 
                /** This empty nullary constructor is required for 
deserializing the configuration. */
                public PojoSerializerConfigSnapshot() {}
 
                public PojoSerializerConfigSnapshot(
                                Class<T> pojoType,
-                               LinkedHashMap<Field, 
TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot,
-                               LinkedHashMap<Class<?>, 
TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots,
-                               HashMap<Class<?>, TypeSerializerConfigSnapshot> 
nonRegisteredSubclassesToSerializerConfigSnapshots) {
+                               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+                               LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
registeredSubclassesToSerializerConfigSnapshots,
+                               HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots) {
+
+                       this(
+                               pojoType,
+                               fieldToSerializerConfigSnapshot,
+                               registeredSubclassesToSerializerConfigSnapshots,
+                               
nonRegisteredSubclassesToSerializerConfigSnapshots,
+                               false);
+               }
+
+               public PojoSerializerConfigSnapshot(
+                               Class<T> pojoType,
+                               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+                               LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
registeredSubclassesToSerializerConfigSnapshots,
+                               HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots,
+                               boolean ignoreTypeSerializerSerialization) {
 
                        super(pojoType);
 
@@ -711,37 +798,71 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                        
Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots);
                        this.nonRegisteredSubclassesToSerializerConfigSnapshots 
=
                                        
Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots);
+
+                       this.ignoreTypeSerializerSerialization = 
ignoreTypeSerializerSerialization;
                }
 
                @Override
                public void write(DataOutputView out) throws IOException {
                        super.write(out);
 
-                       // --- write fields and their serializers, in order
+                       try (
+                               ByteArrayOutputStreamWithPos outWithPos = new 
ByteArrayOutputStreamWithPos();
+                               DataOutputViewStreamWrapper outViewWrapper = 
new DataOutputViewStreamWrapper(outWithPos)) {
 
-                       out.writeInt(fieldToSerializerConfigSnapshot.size());
-                       for (Map.Entry<Field, TypeSerializerConfigSnapshot> 
entry
-                               : fieldToSerializerConfigSnapshot.entrySet()) {
-                               out.writeUTF(entry.getKey().getName());
-                               
TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
-                       }
+                               // --- write fields and their serializers, in 
order
 
-                       // --- write registered subclasses and their 
serializers, in registration order
+                               
out.writeInt(fieldToSerializerConfigSnapshot.size());
+                               for (Map.Entry<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
+                                               : 
fieldToSerializerConfigSnapshot.entrySet()) {
 
-                       
out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
-                       for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> 
entry
-                                       : 
registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
-                               out.writeUTF(entry.getKey().getName());
-                               
TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
-                       }
+                                       
outViewWrapper.writeUTF(entry.getKey().getName());
+
+                                       out.writeInt(outWithPos.getPosition());
+                                       if (!ignoreTypeSerializerSerialization) 
{
+                                               
TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, 
entry.getValue().f0);
+                                       }
+
+                                       out.writeInt(outWithPos.getPosition());
+                                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, 
entry.getValue().f1);
+                               }
+
+                               // --- write registered subclasses and their 
serializers, in registration order
+
+                               
out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
+                               for (Map.Entry<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+                                               : 
registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
+
+                                       
outViewWrapper.writeUTF(entry.getKey().getName());
+
+                                       out.writeInt(outWithPos.getPosition());
+                                       if (!ignoreTypeSerializerSerialization) 
{
+                                               
TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, 
entry.getValue().f0);
+                                       }
+
+                                       out.writeInt(outWithPos.getPosition());
+                                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, 
entry.getValue().f1);
+                               }
+
+                               // --- write snapshot of non-registered 
subclass serializer cache
+
+                               
out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
+                               for (Map.Entry<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+                                               : 
nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
+
+                                       
outViewWrapper.writeUTF(entry.getKey().getName());
+
+                                       out.writeInt(outWithPos.getPosition());
+                                       if (!ignoreTypeSerializerSerialization) 
{
+                                               
TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, 
entry.getValue().f0);
+                                       }
 
-                       // --- write snapshot of non-registered subclass 
serializer cache
+                                       out.writeInt(outWithPos.getPosition());
+                                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, 
entry.getValue().f1);
+                               }
 
-                       
out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
-                       for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> 
entry
-                                       : 
nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
-                               out.writeUTF(entry.getKey().getName());
-                               
TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+                               out.writeInt(outWithPos.getPosition());
+                               out.write(outWithPos.getBuf(), 0 , 
outWithPos.getPosition());
                        }
                }
 
@@ -749,74 +870,126 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                public void read(DataInputView in) throws IOException {
                        super.read(in);
 
-                       // --- read fields and their serializers, in order
-
                        int numFields = in.readInt();
-                       this.fieldToSerializerConfigSnapshot = new 
LinkedHashMap<>(numFields);
-                       String fieldName;
-                       Field field;
+                       int[] fieldSerializerOffsets = new int[numFields * 2];
                        for (int i = 0; i < numFields; i++) {
-                               fieldName = in.readUTF();
+                               fieldSerializerOffsets[i * 2] = in.readInt();
+                               fieldSerializerOffsets[i * 2 + 1] = 
in.readInt();
+                       }
 
-                               // search all superclasses for the field
-                               Class<?> clazz = getTypeClass();
-                               field = null;
-                               while (clazz != null) {
-                                       try {
-                                               field = 
clazz.getDeclaredField(fieldName);
-                                               field.setAccessible(true);
-                                               break;
-                                       } catch (NoSuchFieldException e) {
-                                               clazz = clazz.getSuperclass();
+
+                       int numRegisteredSubclasses = in.readInt();
+                       int[] registeredSerializerOffsets = new 
int[numRegisteredSubclasses * 2];
+                       for (int i = 0; i < numRegisteredSubclasses; i++) {
+                               registeredSerializerOffsets[i * 2] = 
in.readInt();
+                               registeredSerializerOffsets[i * 2 + 1] = 
in.readInt();
+                       }
+
+                       int numCachedSubclassSerializers = in.readInt();
+                       int[] cachedSerializerOffsets = new 
int[numCachedSubclassSerializers * 2];
+                       for (int i = 0; i < numCachedSubclassSerializers; i++) {
+                               cachedSerializerOffsets[i * 2] = in.readInt();
+                               cachedSerializerOffsets[i * 2 + 1] = 
in.readInt();
+                       }
+
+                       int totalBytes = in.readInt();
+                       byte[] buffer = new byte[totalBytes];
+                       in.readFully(buffer);
+
+                       try (
+                               ByteArrayInputStreamWithPos inWithPos = new 
ByteArrayInputStreamWithPos(buffer);
+                               DataInputViewStreamWrapper inViewWrapper = new 
DataInputViewStreamWrapper(inWithPos)) {
+
+                               // --- read fields and their serializers, in 
order
+
+                               this.fieldToSerializerConfigSnapshot = new 
LinkedHashMap<>(numFields);
+                               String fieldName;
+                               Field field;
+                               TypeSerializer<?> fieldSerializer;
+                               TypeSerializerConfigSnapshot 
fieldSerializerConfigSnapshot;
+                               for (int i = 0; i < numFields; i++) {
+                                       fieldName = inViewWrapper.readUTF();
+
+                                       // search all superclasses for the field
+                                       Class<?> clazz = getTypeClass();
+                                       field = null;
+                                       while (clazz != null) {
+                                               try {
+                                                       field = 
clazz.getDeclaredField(fieldName);
+                                                       
field.setAccessible(true);
+                                                       break;
+                                               } catch (NoSuchFieldException 
e) {
+                                                       clazz = 
clazz.getSuperclass();
+                                               }
                                        }
-                               }
 
-                               if (field == null) {
-                                       // the field no longer exists in the 
POJO
-                                       throw new IOException("Can't find field 
" + fieldName + " in POJO class " + getTypeClass().getName());
-                               } else {
-                                       fieldToSerializerConfigSnapshot.put(
+                                       if (field == null) {
+                                               // the field no longer exists 
in the POJO
+                                               throw new IOException("Can't 
find field " + fieldName + " in POJO class " + getTypeClass().getName());
+                                       } else {
+                                               
inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+                                               fieldSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, 
getUserCodeClassLoader());
+
+                                               
inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+                                               fieldSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, 
getUserCodeClassLoader());
+
+                                               
fieldToSerializerConfigSnapshot.put(
                                                        field,
-                                                       
TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+                                                       new 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, 
fieldSerializerConfigSnapshot));
+                                       }
                                }
-                       }
 
-                       // --- read registered subclasses and their 
serializers, in registration order
+                               // --- read registered subclasses and their 
serializers, in registration order
 
-                       int numRegisteredSubclasses = in.readInt();
-                       this.registeredSubclassesToSerializerConfigSnapshots = 
new LinkedHashMap<>(numRegisteredSubclasses);
-                       String registeredSubclassname;
-                       Class<?> registeredSubclass;
-                       for (int i = 0; i < numRegisteredSubclasses; i++) {
-                               registeredSubclassname = in.readUTF();
-                               try {
-                                       registeredSubclass = 
Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
-                               } catch (ClassNotFoundException e) {
-                                       throw new IOException("Cannot find 
requested class " + registeredSubclassname + " in classpath.", e);
-                               }
+                               
this.registeredSubclassesToSerializerConfigSnapshots = new 
LinkedHashMap<>(numRegisteredSubclasses);
+                               String registeredSubclassname;
+                               Class<?> registeredSubclass;
+                               TypeSerializer<?> registeredSubclassSerializer;
+                               TypeSerializerConfigSnapshot 
registeredSubclassSerializerConfigSnapshot;
+                               for (int i = 0; i < numRegisteredSubclasses; 
i++) {
+                                       registeredSubclassname = 
inViewWrapper.readUTF();
+                                       try {
+                                               registeredSubclass = 
Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
+                                       } catch (ClassNotFoundException e) {
+                                               throw new IOException("Cannot 
find requested class " + registeredSubclassname + " in classpath.", e);
+                                       }
 
-                               
this.registeredSubclassesToSerializerConfigSnapshots.put(
-                                               registeredSubclass,
-                                               
TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
-                       }
+                                       
inWithPos.setPosition(registeredSerializerOffsets[i * 2]);
+                                       registeredSubclassSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, 
getUserCodeClassLoader());
 
-                       // --- read snapshot of non-registered subclass 
serializer cache
+                                       
inWithPos.setPosition(registeredSerializerOffsets[i * 2 + 1]);
+                                       
registeredSubclassSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, 
getUserCodeClassLoader());
 
-                       int numCachedSubclassSerializers = in.readInt();
-                       this.nonRegisteredSubclassesToSerializerConfigSnapshots 
= new HashMap<>(numCachedSubclassSerializers);
-                       String cachedSubclassname;
-                       Class<?> cachedSubclass;
-                       for (int i = 0; i < numCachedSubclassSerializers; i++) {
-                               cachedSubclassname = in.readUTF();
-                               try {
-                                       cachedSubclass = 
Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
-                               } catch (ClassNotFoundException e) {
-                                       throw new IOException("Cannot find 
requested class " + cachedSubclassname + " in classpath.", e);
+                                       
this.registeredSubclassesToSerializerConfigSnapshots.put(
+                                               registeredSubclass,
+                                               new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(registeredSubclassSerializer, 
registeredSubclassSerializerConfigSnapshot));
                                }
 
-                               
this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+                               // --- read snapshot of non-registered subclass 
serializer cache
+
+                               
this.nonRegisteredSubclassesToSerializerConfigSnapshots = new 
HashMap<>(numCachedSubclassSerializers);
+                               String cachedSubclassname;
+                               Class<?> cachedSubclass;
+                               TypeSerializer<?> cachedSubclassSerializer;
+                               TypeSerializerConfigSnapshot 
cachedSubclassSerializerConfigSnapshot;
+                               for (int i = 0; i < 
numCachedSubclassSerializers; i++) {
+                                       cachedSubclassname = 
inViewWrapper.readUTF();
+                                       try {
+                                               cachedSubclass = 
Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
+                                       } catch (ClassNotFoundException e) {
+                                               throw new IOException("Cannot 
find requested class " + cachedSubclassname + " in classpath.", e);
+                                       }
+
+                                       
inWithPos.setPosition(cachedSerializerOffsets[i * 2]);
+                                       cachedSubclassSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, 
getUserCodeClassLoader());
+
+                                       
inWithPos.setPosition(cachedSerializerOffsets[i * 2 + 1]);
+                                       cachedSubclassSerializerConfigSnapshot 
= TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, 
getUserCodeClassLoader());
+
+                                       
this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
                                                cachedSubclass,
-                                               
TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+                                               new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(cachedSubclassSerializer, 
cachedSubclassSerializerConfigSnapshot));
+                               }
                        }
                }
 
@@ -825,15 +998,15 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        return VERSION;
                }
 
-               public LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
getFieldToSerializerConfigSnapshot() {
+               public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
                        return fieldToSerializerConfigSnapshot;
                }
 
-               public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> 
getRegisteredSubclassesToSerializerConfigSnapshots() {
+               public LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
getRegisteredSubclassesToSerializerConfigSnapshots() {
                        return registeredSubclassesToSerializerConfigSnapshots;
                }
 
-               public HashMap<Class<?>, TypeSerializerConfigSnapshot> 
getNonRegisteredSubclassesToSerializerConfigSnapshots() {
+               public HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
getNonRegisteredSubclassesToSerializerConfigSnapshots() {
                        return 
nonRegisteredSubclassesToSerializerConfigSnapshots;
                }
 
@@ -1001,27 +1174,35 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        TypeSerializer<?>[] fieldSerializers,
                        HashMap<Class<?>, TypeSerializer<?>> 
nonRegisteredSubclassSerializerCache) {
 
-               final LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
fieldToSerializerConfigSnapshots =
+               final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
                        new LinkedHashMap<>(fields.length);
 
                for (int i = 0; i < fields.length; i++) {
-                       fieldToSerializerConfigSnapshots.put(fields[i], 
fieldSerializers[i].snapshotConfiguration());
+                       fieldToSerializerConfigSnapshots.put(
+                               fields[i],
+                               new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(fieldSerializers[i], 
fieldSerializers[i].snapshotConfiguration()));
                }
 
-               final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> 
registeredSubclassesToSerializerConfigSnapshots =
+               final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots =
                                new 
LinkedHashMap<>(registeredSubclassesToTags.size());
 
                for (Map.Entry<Class<?>, Integer> entry : 
registeredSubclassesToTags.entrySet()) {
                        registeredSubclassesToSerializerConfigSnapshots.put(
                                        entry.getKey(),
-                                       
registeredSubclassSerializers[entry.getValue()].snapshotConfiguration());
+                                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                                               
registeredSubclassSerializers[entry.getValue()],
+                                               
registeredSubclassSerializers[entry.getValue()].snapshotConfiguration()));
                }
 
-               final HashMap<Class<?>, TypeSerializerConfigSnapshot> 
nonRegisteredSubclassesToSerializerConfigSnapshots =
+               final HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots =
                                new 
LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size());
 
                for (Map.Entry<Class<?>, TypeSerializer<?>> entry : 
nonRegisteredSubclassSerializerCache.entrySet()) {
-                       
nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), 
entry.getValue().snapshotConfiguration());
+                       nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+                               entry.getKey(),
+                               new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                                       entry.getValue(),
+                                       
entry.getValue().snapshotConfiguration()));
                }
 
                return new PojoSerializerConfigSnapshot<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index ba41d4b..bd08b04 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -19,11 +19,13 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
@@ -31,6 +33,7 @@ import org.apache.flink.types.Row;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.Arrays;
+import java.util.List;
 
 import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
 import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
@@ -254,22 +257,28 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
 
        @Override
        public RowSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+               return new RowSerializerConfigSnapshot(fieldSerializers);
        }
 
        @Override
        public CompatibilityResult<Row> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof RowSerializerConfigSnapshot) {
-                       TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots =
-                               ((RowSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
+                               ((RowSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
 
-                       if (fieldSerializerConfigSnapshots.length == 
fieldSerializers.length) {
+                       if (previousFieldSerializersAndConfigs.size() == 
fieldSerializers.length) {
                                boolean requireMigration = false;
                                TypeSerializer<?>[] convertDeserializers = new 
TypeSerializer<?>[fieldSerializers.length];
 
                                CompatibilityResult<?> compatResult;
-                               for (int i = 0; i < fieldSerializers.length; 
i++) {
-                                       compatResult = 
fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+                               int i = 0;
+                               for (Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
+                                       compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                                       f.f0,
+                                                       
UnloadableDummyTypeSerializer.class,
+                                                       f.f1,
+                                                       fieldSerializers[i]);
+
                                        if (compatResult.isRequiresMigration()) 
{
                                                requireMigration = true;
 
@@ -281,6 +290,8 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
                                                                new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
                                                }
                                        }
+
+                                       i++;
                                }
 
                                if (requireMigration) {
@@ -301,8 +312,8 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
                /** This empty nullary constructor is required for 
deserializing the configuration. */
                public RowSerializerConfigSnapshot() {}
 
-               public 
RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots) {
-                       super(fieldSerializerConfigSnapshots);
+               public RowSerializerConfigSnapshot(TypeSerializer[] 
fieldSerializers) {
+                       super(fieldSerializers);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index f485c3e..911c96f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -156,4 +156,9 @@ public class TupleSerializer<T extends Tuple> extends 
TupleSerializerBase<T> {
                        throw new RuntimeException("Cannot instantiate tuple.", 
e);
                }
        }
+
+       @Override
+       protected TupleSerializerBase<T> createSerializerInstance(Class<T> 
tupleClass, TypeSerializer<?>[] fieldSerializers) {
+               return new TupleSerializer<>(tupleClass, fieldSerializers);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 032c3f1..f12dcd9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -20,14 +20,18 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -125,9 +129,7 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
 
        @Override
        public TupleSerializerConfigSnapshot<T> snapshotConfiguration() {
-               return new TupleSerializerConfigSnapshot<>(
-                               tupleClass,
-                               
TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+               return new TupleSerializerConfigSnapshot<>(tupleClass, 
fieldSerializers);
        }
 
        @SuppressWarnings("unchecked")
@@ -137,24 +139,48 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
                        final TupleSerializerConfigSnapshot<T> config = 
(TupleSerializerConfigSnapshot<T>) configSnapshot;
 
                        if (tupleClass.equals(config.getTupleClass())) {
-                               TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots =
-                                       ((TupleSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+                               List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
+                                       ((TupleSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                               if (previousFieldSerializersAndConfigs.size() 
== fieldSerializers.length) {
+
+                                       TypeSerializer<Object>[] 
convertFieldSerializers = new TypeSerializer[fieldSerializers.length];
+                                       boolean requiresMigration = false;
+                                       CompatibilityResult<Object> 
compatResult;
+                                       int i = 0;
+                                       for (Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
+                                               compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                                               f.f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               f.f1,
+                                                               
fieldSerializers[i]);
 
-                               if (fieldSerializerConfigSnapshots.length == 
fieldSerializers.length) {
-
-                                       CompatibilityResult compatResult;
-                                       for (int i = 0; i < 
fieldSerializers.length; i++) {
-                                               compatResult = 
fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
                                                if 
(compatResult.isRequiresMigration()) {
-                                                       return 
CompatibilityResult.requiresMigration();
+                                                       requiresMigration = 
true;
+
+                                                       if 
(compatResult.getConvertDeserializer() != null) {
+                                                               
convertFieldSerializers[i] =
+                                                                       new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
+                                                       } else {
+                                                               return 
CompatibilityResult.requiresMigration();
+                                                       }
                                                }
+
+                                               i++;
                                        }
 
-                                       return CompatibilityResult.compatible();
+                                       if (!requiresMigration) {
+                                               return 
CompatibilityResult.compatible();
+                                       } else {
+                                               return 
CompatibilityResult.requiresMigration(
+                                                       
createSerializerInstance(tupleClass, convertFieldSerializers));
+                                       }
                                }
                        }
                }
 
                return CompatibilityResult.requiresMigration();
        }
+
+       protected abstract TupleSerializerBase<T> 
createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 6d2bb5f..1e7701c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
@@ -41,11 +41,8 @@ public final class TupleSerializerConfigSnapshot<T> extends 
CompositeTypeSeriali
        /** This empty nullary constructor is required for deserializing the 
configuration. */
        public TupleSerializerConfigSnapshot() {}
 
-       public TupleSerializerConfigSnapshot(
-                       Class<T> tupleClass,
-                       TypeSerializerConfigSnapshot[] 
fieldSerializerConfigSnapshots) {
-
-               super(fieldSerializerConfigSnapshots);
+       public TupleSerializerConfigSnapshot(Class<T> tupleClass, 
TypeSerializer<?>[] fieldSerializers) {
+               super(fieldSerializers);
 
                this.tupleClass = Preconditions.checkNotNull(tupleClass);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 73c4379..57015c7 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -109,14 +109,14 @@ public abstract class SerializerTestBase<T> extends 
TestLogger {
 
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
                                new DataOutputViewStreamWrapper(out), 
configSnapshot);
                        serializedConfig = out.toByteArray();
                }
 
                TypeSerializerConfigSnapshot restoredConfig;
                try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       restoredConfig = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       restoredConfig = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
deleted file mode 100644
index 0783bb6..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Unit tests related to {@link TypeSerializerConfigSnapshot}.
- */
-public class TypeSerializerConfigSnapshotTest {
-
-       /**
-        * Verifies that reading and writing configuration snapshots work 
correctly.
-        */
-       @Test
-       public void testSerializeConfigurationSnapshots() throws Exception {
-               TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, 
"foo");
-               TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, 
"bar");
-
-               byte[] serializedConfig;
-               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshots(
-                               new DataOutputViewStreamWrapper(out),
-                               configSnapshot1,
-                               configSnapshot2);
-
-                       serializedConfig = out.toByteArray();
-               }
-
-               TypeSerializerConfigSnapshot[] restoredConfigs;
-               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       restoredConfigs = 
TypeSerializerUtil.readSerializerConfigSnapshots(
-                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
-               }
-
-               assertEquals(2, restoredConfigs.length);
-               assertEquals(configSnapshot1, restoredConfigs[0]);
-               assertEquals(configSnapshot2, restoredConfigs[1]);
-       }
-
-       /**
-        * Verifies that deserializing config snapshots fail if the config 
class could not be found.
-        */
-       @Test
-       public void testFailsWhenConfigurationSnapshotClassNotFound() throws 
Exception {
-               byte[] serializedConfig;
-               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(
-                               new DataOutputViewStreamWrapper(out), new 
TestConfigSnapshot(123, "foobar"));
-                       serializedConfig = out.toByteArray();
-               }
-
-               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       // read using a dummy classloader
-                       TypeSerializerUtil.readSerializerConfigSnapshot(
-                               new DataInputViewStreamWrapper(in), new 
URLClassLoader(new URL[0], null));
-                       fail("Expected a ClassNotFoundException wrapped in 
IOException");
-               } catch (IOException expected) {
-                       // test passes
-               }
-       }
-
-       public static class TestConfigSnapshot extends 
TypeSerializerConfigSnapshot {
-
-               static final int VERSION = 1;
-
-               private int val;
-               private String msg;
-
-               public TestConfigSnapshot() {}
-
-               public TestConfigSnapshot(int val, String msg) {
-                       this.val = val;
-                       this.msg = msg;
-               }
-
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       super.write(out);
-                       out.writeInt(val);
-                       out.writeUTF(msg);
-               }
-
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       super.read(in);
-                       val = in.readInt();
-                       msg = in.readUTF();
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj == this) {
-                               return true;
-                       }
-
-                       if (obj == null) {
-                               return false;
-                       }
-
-                       if (obj instanceof TestConfigSnapshot) {
-                               return val == ((TestConfigSnapshot) obj).val && 
msg.equals(((TestConfigSnapshot) obj).msg);
-                       } else {
-                               return false;
-                       }
-               }
-
-               @Override
-               public int hashCode() {
-                       return 31 * val + msg.hashCode();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
deleted file mode 100644
index db1b4ef..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(InstantiationUtil.class)
-public class TypeSerializerSerializationProxyTest {
-
-       @Test
-       public void testStateSerializerSerializationProxy() throws Exception {
-
-               TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
-               TypeSerializerSerializationProxy<?> proxy = new 
TypeSerializerSerializationProxy<>(serializer);
-
-               byte[] serialized;
-               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       proxy.write(new DataOutputViewStreamWrapper(out));
-                       serialized = out.toByteArray();
-               }
-
-               proxy = new 
TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader());
-
-               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       proxy.read(new DataInputViewStreamWrapper(in));
-               }
-
-               Assert.assertEquals(serializer, proxy.getTypeSerializer());
-       }
-
-       @Test
-       public void testStateSerializerSerializationProxyClassNotFound() throws 
Exception {
-
-               TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
-               TypeSerializerSerializationProxy<?> proxy = new 
TypeSerializerSerializationProxy<>(serializer);
-
-               byte[] serialized;
-               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       proxy.write(new DataOutputViewStreamWrapper(out));
-                       serialized = out.toByteArray();
-               }
-
-               proxy = new TypeSerializerSerializationProxy<>(new 
URLClassLoader(new URL[0], null));
-
-               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       proxy.read(new DataInputViewStreamWrapper(in));
-                       fail("ClassNotFoundException expected, leading to 
IOException");
-               } catch (IOException expected) {
-
-               }
-
-               proxy = new TypeSerializerSerializationProxy<>(new 
URLClassLoader(new URL[0], null), true);
-
-               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       proxy.read(new DataInputViewStreamWrapper(in));
-               }
-
-               Assert.assertTrue(proxy.getTypeSerializer() instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
-
-               Assert.assertArrayEquals(
-                               InstantiationUtil.serializeObject(serializer),
-                               
((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) 
proxy.getTypeSerializer()).getActualBytes());
-       }
-
-       @Test
-       public void testStateSerializerSerializationProxyInvalidClass() throws 
Exception {
-
-               TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
-               TypeSerializerSerializationProxy<?> proxy = new 
TypeSerializerSerializationProxy<>(serializer);
-
-               byte[] serialized;
-               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       proxy.write(new DataOutputViewStreamWrapper(out));
-                       serialized = out.toByteArray();
-               }
-
-               PowerMockito.spy(InstantiationUtil.class);
-               PowerMockito
-                       .doThrow(new InvalidClassException("test invalid class 
exception"))
-                       .when(InstantiationUtil.class, "deserializeObject", 
any(byte[].class), any(ClassLoader.class));
-
-               proxy = new TypeSerializerSerializationProxy<>(new 
URLClassLoader(new URL[0], null));
-
-               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       proxy.read(new DataInputViewStreamWrapper(in));
-                       fail("InvalidClassException expected, leading to 
IOException");
-               } catch (IOException expected) {
-
-               }
-
-               proxy = new TypeSerializerSerializationProxy<>(new 
URLClassLoader(new URL[0], null), true);
-
-               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       proxy.read(new DataInputViewStreamWrapper(in));
-               }
-
-               Assert.assertTrue(proxy.getTypeSerializer() instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
-
-               Assert.assertArrayEquals(
-                       InstantiationUtil.serializeObject(serializer),
-                       
((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) 
proxy.getTypeSerializer()).getActualBytes());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
new file mode 100644
index 0000000..738644b
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link TypeSerializerSerializationUtil}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TypeSerializerSerializationUtil.class)
+public class TypeSerializerSerializationUtilTest {
+
+       /**
+        * Verifies that reading and writing serializers work correctly.
+        */
+       @Test
+       public void testSerializerSerialization() throws Exception {
+
+               TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+               byte[] serialized;
+               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
+                       TypeSerializerSerializationUtil.writeSerializer(new 
DataOutputViewStreamWrapper(out), serializer);
+                       serialized = out.toByteArray();
+               }
+
+               TypeSerializer<?> deserializedSerializer;
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       deserializedSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               Assert.assertEquals(serializer, deserializedSerializer);
+       }
+
+       /**
+        * Verifies deserialization failure cases when reading a serializer 
from bytes, in the
+        * case of a {@link ClassNotFoundException}.
+        */
+       @Test
+       public void testSerializerSerializationWithClassNotFound() throws 
Exception {
+
+               TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+               byte[] serialized;
+               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
+                       TypeSerializerSerializationUtil.writeSerializer(new 
DataOutputViewStreamWrapper(out), serializer);
+                       serialized = out.toByteArray();
+               }
+
+               TypeSerializer<?> deserializedSerializer;
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       deserializedSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(
+                               new DataInputViewStreamWrapper(in), new 
URLClassLoader(new URL[0], null));
+               }
+               Assert.assertEquals(null, deserializedSerializer);
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       deserializedSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(
+                               new DataInputViewStreamWrapper(in), new 
URLClassLoader(new URL[0], null), true);
+               }
+               Assert.assertTrue(deserializedSerializer instanceof 
UnloadableDummyTypeSerializer);
+
+               Assert.assertArrayEquals(
+                               InstantiationUtil.serializeObject(serializer),
+                               ((UnloadableDummyTypeSerializer<?>) 
deserializedSerializer).getActualBytes());
+       }
+
+       /**
+        * Verifies deserialization failure cases when reading a serializer 
from bytes, in the
+        * case of a {@link InvalidClassException}.
+        */
+       @Test
+       public void testSerializerSerializationWithInvalidClass() throws 
Exception {
+
+               TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+               byte[] serialized;
+               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
+                       TypeSerializerSerializationUtil.writeSerializer(new 
DataOutputViewStreamWrapper(out), serializer);
+                       serialized = out.toByteArray();
+               }
+
+               TypeSerializer<?> deserializedSerializer;
+
+               // mock failure when deserializing serializers
+               
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                               
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+               doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+               
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       deserializedSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+               Assert.assertEquals(null, deserializedSerializer);
+       }
+
+       /**
+        * Verifies that reading and writing configuration snapshots work 
correctly.
+        */
+       @Test
+       public void testSerializeConfigurationSnapshots() throws Exception {
+               TypeSerializerSerializationUtilTest.TestConfigSnapshot 
configSnapshot1 =
+                       new 
TypeSerializerSerializationUtilTest.TestConfigSnapshot(1, "foo");
+
+               TypeSerializerSerializationUtilTest.TestConfigSnapshot 
configSnapshot2 =
+                       new 
TypeSerializerSerializationUtilTest.TestConfigSnapshot(2, "bar");
+
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshots(
+                               new DataOutputViewStreamWrapper(out),
+                               configSnapshot1,
+                               configSnapshot2);
+
+                       serializedConfig = out.toByteArray();
+               }
+
+               TypeSerializerConfigSnapshot[] restoredConfigs;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       restoredConfigs = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshots(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               assertEquals(2, restoredConfigs.length);
+               assertEquals(configSnapshot1, restoredConfigs[0]);
+               assertEquals(configSnapshot2, restoredConfigs[1]);
+       }
+
+       /**
+        * Verifies that deserializing config snapshots fail if the config 
class could not be found.
+        */
+       @Test
+       public void testFailsWhenConfigurationSnapshotClassNotFound() throws 
Exception {
+               byte[] serializedConfig;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
+                               new DataOutputViewStreamWrapper(out), new 
TypeSerializerSerializationUtilTest.TestConfigSnapshot(123, "foobar"));
+                       serializedConfig = out.toByteArray();
+               }
+
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       // read using a dummy classloader
+                       
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), new 
URLClassLoader(new URL[0], null));
+                       fail("Expected a ClassNotFoundException wrapped in 
IOException");
+               } catch (IOException expected) {
+                       // test passes
+               }
+       }
+
+       /**
+        * Verifies resilience to serializer deserialization failures when 
writing and reading
+        * serializer and config snapshot pairs.
+        */
+       @Test
+       public void 
testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures()
 throws Exception {
+               List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
serializersAndConfigs = Arrays.asList(
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               IntSerializer.INSTANCE, 
IntSerializer.INSTANCE.snapshotConfiguration()),
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               DoubleSerializer.INSTANCE, 
DoubleSerializer.INSTANCE.snapshotConfiguration()));
+
+               byte[] serializedSerializersAndConfigs;
+               try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                       
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+                                       new DataOutputViewStreamWrapper(out), 
serializersAndConfigs);
+                       serializedSerializersAndConfigs = out.toByteArray();
+               }
+
+               // mock failure when deserializing serializers
+               
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                               
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+               doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+               
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+               List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
restored;
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedSerializersAndConfigs)) {
+                       restored = 
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               Assert.assertEquals(2, restored.size());
+               Assert.assertEquals(null, restored.get(0).f0);
+               
Assert.assertEquals(IntSerializer.INSTANCE.snapshotConfiguration(), 
restored.get(0).f1);
+               Assert.assertEquals(null, restored.get(1).f0);
+               
Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), 
restored.get(1).f1);
+       }
+
+       public static class TestConfigSnapshot extends 
TypeSerializerConfigSnapshot {
+
+               static final int VERSION = 1;
+
+               private int val;
+               private String msg;
+
+               public TestConfigSnapshot() {}
+
+               public TestConfigSnapshot(int val, String msg) {
+                       this.val = val;
+                       this.msg = msg;
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+                       out.writeInt(val);
+                       out.writeUTF(msg);
+               }
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+                       val = in.readInt();
+                       msg = in.readUTF();
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == this) {
+                               return true;
+                       }
+
+                       if (obj == null) {
+                               return false;
+                       }
+
+                       if (obj instanceof 
TypeSerializerSerializationUtilTest.TestConfigSnapshot) {
+                               return val == 
((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).val
+                                       && 
msg.equals(((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).msg);
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       return 31 * val + msg.hashCode();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 16ea945..e3ce3ee 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.common.typeutils.base;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
@@ -95,14 +95,14 @@ public class EnumSerializerTest extends TestLogger {
 
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
                                new DataOutputViewStreamWrapper(out), 
serializer.snapshotConfiguration());
                        serializedConfig = out.toByteArray();
                }
 
                TypeSerializerConfigSnapshot restoredConfig;
                try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       restoredConfig = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       restoredConfig = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index c77ffcc..10f4708 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -20,12 +20,14 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 
@@ -39,8 +41,9 @@ import 
org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -50,14 +53,23 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 
 /**
  * A test for the {@link PojoSerializer}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.TestUserClass> {
        private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
 
@@ -286,7 +298,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer1.snapshotConfiguration();
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
                        serializedConfig = out.toByteArray();
                }
 
@@ -295,7 +307,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
 
                // read configuration again from bytes
                try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       pojoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
@@ -322,7 +334,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer.snapshotConfiguration();
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
                        serializedConfig = out.toByteArray();
                }
 
@@ -335,7 +347,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
 
                // read configuration from bytes
                try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       pojoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
@@ -368,7 +380,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer.snapshotConfiguration();
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
                        serializedConfig = out.toByteArray();
                }
 
@@ -378,7 +390,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
 
                // read configuration from bytes
                try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       pojoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
@@ -426,7 +438,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = 
pojoSerializer.snapshotConfiguration();
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
                        serializedConfig = out.toByteArray();
                }
 
@@ -438,7 +450,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
 
                // read configuration from bytes
                try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       pojoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       pojoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
@@ -472,14 +484,38 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                // creating this serializer just for generating config 
snapshots of the field serializers
                PojoSerializer<TestUserClass> ser = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
-               LinkedHashMap<Field, TypeSerializerConfigSnapshot> 
mockOriginalFieldToSerializerConfigSnapshot =
+               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
                        new LinkedHashMap<>(mockOriginalFieldOrder.length);
-               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], 
ser.getFieldSerializers()[3].snapshotConfiguration());
-               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], 
ser.getFieldSerializers()[2].snapshotConfiguration());
-               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], 
ser.getFieldSerializers()[5].snapshotConfiguration());
-               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], 
ser.getFieldSerializers()[0].snapshotConfiguration());
-               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], 
ser.getFieldSerializers()[1].snapshotConfiguration());
-               
mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], 
ser.getFieldSerializers()[4].snapshotConfiguration());
+               mockOriginalFieldToSerializerConfigSnapshot.put(
+                       mockOriginalFieldOrder[0],
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               ser.getFieldSerializers()[3],
+                               
ser.getFieldSerializers()[3].snapshotConfiguration()));
+               mockOriginalFieldToSerializerConfigSnapshot.put(
+                       mockOriginalFieldOrder[1],
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               ser.getFieldSerializers()[2],
+                               
ser.getFieldSerializers()[2].snapshotConfiguration()));
+               mockOriginalFieldToSerializerConfigSnapshot.put(
+                       mockOriginalFieldOrder[2],
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               ser.getFieldSerializers()[5],
+                               
ser.getFieldSerializers()[5].snapshotConfiguration()));
+               mockOriginalFieldToSerializerConfigSnapshot.put(
+                       mockOriginalFieldOrder[3],
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               ser.getFieldSerializers()[0],
+                               
ser.getFieldSerializers()[0].snapshotConfiguration()));
+               mockOriginalFieldToSerializerConfigSnapshot.put(
+                       mockOriginalFieldOrder[4],
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               ser.getFieldSerializers()[1],
+                               
ser.getFieldSerializers()[1].snapshotConfiguration()));
+               mockOriginalFieldToSerializerConfigSnapshot.put(
+                       mockOriginalFieldOrder[5],
+                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                               ser.getFieldSerializers()[4],
+                               
ser.getFieldSerializers()[4].snapshotConfiguration()));
 
                PojoSerializer<TestUserClass> pojoSerializer = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
@@ -494,13 +530,11 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                        new PojoSerializer.PojoSerializerConfigSnapshot<>(
                                TestUserClass.class,
                                mockOriginalFieldToSerializerConfigSnapshot, // 
this mocks the previous field order
-                               new LinkedHashMap<Class<?>, 
TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test
-                               new HashMap<Class<?>, 
TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test
+                               new LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>(), // empty; 
irrelevant for this test
+                               new HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>>()); // empty; irrelevant for this test
 
                // reconfigure - check reconfiguration result and that fields 
are reordered to the previous order
-               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(
-
-                       mockPreviousConfigSnapshot);
+               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot);
                assertFalse(compatResult.isRequiresMigration());
                int i = 0;
                for (Field field : mockOriginalFieldOrder) {
@@ -508,4 +542,74 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                        i++;
                }
        }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testSerializerSerializationFailureResilience() throws 
Exception{
+               PojoSerializer<TestUserClass> pojoSerializer = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+               // snapshot configuration and serialize to bytes
+               PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> 
config = pojoSerializer.snapshotConfiguration();
+               byte[] serializedConfig;
+               try (
+                       ByteArrayOutputStream out = new 
ByteArrayOutputStream()) {
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), config);
+                       serializedConfig = out.toByteArray();
+               }
+
+               // mock failure when deserializing serializers
+               
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                       
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+               doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+               
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+               // read configuration from bytes
+               PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> 
deserializedConfig;
+               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+                       deserializedConfig = 
(PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass>)
+                               
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+                                       new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+
+               
Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
+               
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(config, 
deserializedConfig);
+       }
+
+       private static void 
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+                       PojoSerializer.PojoSerializerConfigSnapshot<?> original,
+                       PojoSerializer.PojoSerializerConfigSnapshot<?> 
deserializedConfig) {
+
+               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+                               original.getFieldToSerializerConfigSnapshot();
+               for (Map.Entry<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
+                               : 
deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
+
+                       Assert.assertEquals(null, entry.getValue().f0);
+
+                       if (entry.getValue().f1 instanceof 
PojoSerializer.PojoSerializerConfigSnapshot) {
+                               
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+                                       
(PojoSerializer.PojoSerializerConfigSnapshot<?>) 
originalFieldSerializersAndConfs.get(entry.getKey()).f1,
+                                       
(PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
+                       } else {
+                               
Assert.assertEquals(originalFieldSerializersAndConfs.get(entry.getKey()).f1, 
entry.getValue().f1);
+                       }
+               }
+
+               LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> originalRegistrations =
+                               
original.getRegisteredSubclassesToSerializerConfigSnapshots();
+
+               for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
+                               : 
deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet())
 {
+
+                       Assert.assertEquals(null, entry.getValue().f0);
+
+                       if (entry.getValue().f1 instanceof 
PojoSerializer.PojoSerializerConfigSnapshot) {
+                               
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+                                       
(PojoSerializer.PojoSerializerConfigSnapshot<?>) 
originalRegistrations.get(entry.getKey()).f1,
+                                       
(PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
+                       } else {
+                               
Assert.assertEquals(originalRegistrations.get(entry.getKey()).f1, 
entry.getValue().f1);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 860c560..5a404bd 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -25,7 +25,7 @@ import com.esotericsoftware.kryo.io.Output;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.Test;
@@ -53,7 +53,7 @@ public class KryoSerializerCompatibilityTest {
                TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = 
kryoSerializerForA.snapshotConfiguration();
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
                        serializedConfig = out.toByteArray();
                }
 
@@ -61,7 +61,7 @@ public class KryoSerializerCompatibilityTest {
 
                // read configuration again from bytes
                try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       kryoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       kryoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
@@ -91,7 +91,7 @@ public class KryoSerializerCompatibilityTest {
                TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = 
kryoSerializer.snapshotConfiguration();
                byte[] serializedConfig;
                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                       TypeSerializerUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+                       
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new 
DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
                        serializedConfig = out.toByteArray();
                }
 
@@ -104,7 +104,7 @@ public class KryoSerializerCompatibilityTest {
 
                // read configuration from bytes
                try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
-                       kryoSerializerConfigSnapshot = 
TypeSerializerUtil.readSerializerConfigSnapshot(
+                       kryoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 

Reply via email to