Repository: flink
Updated Branches:
  refs/heads/release-1.3 0d9087df4 -> 09cc3f7c5


http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ff5a342..2be09ad 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -22,10 +22,12 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -811,7 +813,7 @@ public class NFA<T> implements Serializable {
        /**
         * The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
         */
-       public static final class NFASerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+       public static final class NFASerializerConfigSnapshot<T> extends 
CompositeTypeSerializerConfigSnapshot {
 
                private static final int VERSION = 1;
 
@@ -819,10 +821,10 @@ public class NFA<T> implements Serializable {
                public NFASerializerConfigSnapshot() {}
 
                public NFASerializerConfigSnapshot(
-                               TypeSerializerConfigSnapshot 
sharedBufferSerializerConfigSnapshot,
-                               TypeSerializerConfigSnapshot 
eventSerializerConfigSnapshot) {
+                               TypeSerializer<T> eventSerializer,
+                               TypeSerializer<SharedBuffer<String, T>> 
sharedBufferSerializer) {
 
-                       super(sharedBufferSerializerConfigSnapshot, 
eventSerializerConfigSnapshot);
+                       super(eventSerializer, sharedBufferSerializer);
                }
 
                @Override
@@ -1062,29 +1064,36 @@ public class NFA<T> implements Serializable {
 
                @Override
                public TypeSerializerConfigSnapshot snapshotConfiguration() {
-                       return new NFASerializerConfigSnapshot(
-                                       eventSerializer.snapshotConfiguration(),
-                                       
sharedBufferSerializer.snapshotConfiguration()
-                       );
+                       return new 
NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer);
                }
+
+               @Override
                public CompatibilityResult<NFA<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                        if (configSnapshot instanceof 
NFASerializerConfigSnapshot) {
-                               TypeSerializerConfigSnapshot[] 
serializerConfigSnapshots =
-                                               ((NFASerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+                               List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersAndConfigs =
+                                               ((NFASerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                               CompatibilityResult<T> eventCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                               serializersAndConfigs.get(0).f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               serializersAndConfigs.get(0).f1,
+                                               eventSerializer);
 
-                               CompatibilityResult<T> elementCompatResult =
-                                               
eventSerializer.ensureCompatibility(serializerConfigSnapshots[0]);
                                CompatibilityResult<SharedBuffer<String, T>> 
sharedBufCompatResult =
-                                               
sharedBufferSerializer.ensureCompatibility(serializerConfigSnapshots[1]);
+                                               
CompatibilityUtil.resolveCompatibilityResult(
+                                                               
serializersAndConfigs.get(1).f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               
serializersAndConfigs.get(1).f1,
+                                                               
sharedBufferSerializer);
 
-                               if 
(!sharedBufCompatResult.isRequiresMigration() && 
!elementCompatResult.isRequiresMigration()) {
+                               if 
(!sharedBufCompatResult.isRequiresMigration() && 
!eventCompatResult.isRequiresMigration()) {
                                        return CompatibilityResult.compatible();
                                } else {
-                                       if 
(elementCompatResult.getConvertDeserializer() != null &&
+                                       if 
(eventCompatResult.getConvertDeserializer() != null &&
                                                        
sharedBufCompatResult.getConvertDeserializer() != null) {
                                                return 
CompatibilityResult.requiresMigration(
                                                                new 
NFASerializer<>(
-                                                                               
new TypeDeserializerAdapter<>(elementCompatResult.getConvertDeserializer()),
+                                                                               
new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
                                                                                
new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index d0f6bf4..91fce1f 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -23,10 +23,13 @@ import com.google.common.collect.ListMultimap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -809,7 +812,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
        /**
         * The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
         */
-       public static final class SharedBufferSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+       public static final class SharedBufferSerializerConfigSnapshot<K, V> 
extends CompositeTypeSerializerConfigSnapshot {
 
                private static final int VERSION = 1;
 
@@ -817,11 +820,11 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
                public SharedBufferSerializerConfigSnapshot() {}
 
                public SharedBufferSerializerConfigSnapshot(
-                               TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot,
-                               TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot,
-                               TypeSerializerConfigSnapshot 
versionSerializerConfigSnapshot) {
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<V> valueSerializer,
+                               TypeSerializer<DeweyNumber> versionSerializer) {
 
-                       super(keySerializerConfigSnapshot, 
valueSerializerConfigSnapshot, versionSerializerConfigSnapshot);
+                       super(keySerializer, valueSerializer, 
versionSerializer);
                }
 
                @Override
@@ -1115,22 +1118,35 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
                @Override
                public TypeSerializerConfigSnapshot snapshotConfiguration() {
-                       return new SharedBufferSerializerConfigSnapshot(
-                                       keySerializer.snapshotConfiguration(),
-                                       valueSerializer.snapshotConfiguration(),
-                                       
versionSerializer.snapshotConfiguration()
-                       );
+                       return new SharedBufferSerializerConfigSnapshot<>(
+                                       keySerializer,
+                                       valueSerializer,
+                                       versionSerializer);
                }
 
                @Override
                public CompatibilityResult<SharedBuffer<K, V>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                        if (configSnapshot instanceof 
SharedBufferSerializerConfigSnapshot) {
-                               TypeSerializerConfigSnapshot[] 
serializerConfigSnapshots =
-                                               
((SharedBufferSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
-
-                               CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(serializerConfigSnapshots[0]);
-                               CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(serializerConfigSnapshots[1]);
-                               CompatibilityResult<DeweyNumber> 
versionCompatResult = 
versionSerializer.ensureCompatibility(serializerConfigSnapshots[2]);
+                               List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializerConfigSnapshots =
+                                               
((SharedBufferSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                               CompatibilityResult<K> keyCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                               
serializerConfigSnapshots.get(0).f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               
serializerConfigSnapshots.get(0).f1,
+                                               keySerializer);
+
+                               CompatibilityResult<V> valueCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                               
serializerConfigSnapshots.get(1).f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               
serializerConfigSnapshots.get(1).f1,
+                                               valueSerializer);
+
+                               CompatibilityResult<DeweyNumber> 
versionCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+                                               
serializerConfigSnapshots.get(2).f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               
serializerConfigSnapshots.get(2).f1,
+                                               versionSerializer);
 
                                if (!keyCompatResult.isRequiresMigration() && 
!valueCompatResult.isRequiresMigration() && 
!versionCompatResult.isRequiresMigration()) {
                                        return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 2ed7245..7b6e5e3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -22,10 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -482,14 +485,20 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
 
                @Override
                public TypeSerializerConfigSnapshot snapshotConfiguration() {
-                       return new 
CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+                       return new 
CollectionSerializerConfigSnapshot<>(elementSerializer);
                }
 
                @Override
                public CompatibilityResult<PriorityQueue<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                        if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
-                               CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
-                                               
((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+                               Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> previousElemSerializerAndConfig =
+                                       ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
+
+                               CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                               
previousElemSerializerAndConfig.f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               
previousElemSerializerAndConfig.f1,
+                                               elementSerializer);
 
                                if (!compatResult.isRequiresMigration()) {
                                        return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 0fd3680..caf346c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -81,8 +81,7 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) 
extends TypeSeriali
   // 
--------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new CRowSerializer.CRowSerializerConfigSnapshot(
-      rowSerializer.snapshotConfiguration())
+    new CRowSerializer.CRowSerializerConfigSnapshot(rowSerializer)
   }
 
   override def ensureCompatibility(
@@ -90,8 +89,11 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) 
extends TypeSeriali
 
     configSnapshot match {
       case crowSerializerConfigSnapshot: 
CRowSerializer.CRowSerializerConfigSnapshot =>
-        val compatResult = rowSerializer.ensureCompatibility(
-            
crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
+        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
+          crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+          rowSerializer)
 
         if (compatResult.isRequiresMigration) {
           if (compatResult.getConvertDeserializer != null) {
@@ -114,8 +116,8 @@ class CRowSerializer(val rowSerializer: 
TypeSerializer[Row]) extends TypeSeriali
 object CRowSerializer {
 
   class CRowSerializerConfigSnapshot(
-      private var rowSerializerConfigSnapshot: TypeSerializerConfigSnapshot)
-    extends CompositeTypeSerializerConfigSnapshot(rowSerializerConfigSnapshot) 
{
+      private val rowSerializer: TypeSerializer[Row])
+    extends CompositeTypeSerializerConfigSnapshot(rowSerializer) {
 
     /** This empty nullary constructor is required for deserializing the 
configuration. */
     def this() = this(null)

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 56eb7ea..3c4f4b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -18,10 +18,13 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -143,14 +146,20 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
 
        @Override
        public TypeSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+               return new 
CollectionSerializerConfigSnapshot<>(elementSerializer);
        }
 
        @Override
        public CompatibilityResult<ArrayList<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
-                       CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
-                               ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
previousElemSerializerAndConfig =
+                               ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
+
+                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       previousElemSerializerAndConfig.f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousElemSerializerAndConfig.f1,
+                                       elementSerializer);
 
                        if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 1d3af72..eec2e93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -296,8 +296,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                for 
(RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : 
restoredMetaInfoSnapshots) {
 
                                        if 
(restoredMetaInfo.getPartitionStateSerializer() == null ||
-                                                       
restoredMetaInfo.getPartitionStateSerializer()
-                                                               instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+                                                       
restoredMetaInfo.getPartitionStateSerializer() instanceof 
UnloadableDummyTypeSerializer) {
 
                                                // must fail now if the 
previous serializer cannot be restored because there is no serializer
                                                // capable of reading previous 
state

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index b93c9e0..066684b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -20,16 +20,20 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -203,19 +207,26 @@ public final class HashMapSerializer<K, V> extends 
TypeSerializer<HashMap<K, V>>
 
        @Override
        public TypeSerializerConfigSnapshot snapshotConfiguration() {
-               return new MapSerializerConfigSnapshot(
-                               keySerializer.snapshotConfiguration(),
-                               valueSerializer.snapshotConfiguration());
+               return new MapSerializerConfigSnapshot<>(keySerializer, 
valueSerializer);
        }
 
        @Override
        public CompatibilityResult<HashMap<K, V>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof MapSerializerConfigSnapshot) {
-                       TypeSerializerConfigSnapshot[] 
keyValueSerializerConfigSnapshots =
-                               ((MapSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
-
-                       CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
-                       CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
+                               ((MapSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                       CompatibilityResult<K> keyCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
previousKvSerializersAndConfigs.get(0).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       
previousKvSerializersAndConfigs.get(0).f1,
+                                       keySerializer);
+
+                       CompatibilityResult<V> valueCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
previousKvSerializersAndConfigs.get(1).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       
previousKvSerializersAndConfigs.get(1).f1,
+                                       valueSerializer);
 
                        if (!keyCompatResult.isRequiresMigration() && 
!valueCompatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index f265f78..2ff8cb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -20,19 +20,16 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -94,20 +91,10 @@ public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritab
                super.write(out);
 
                // write in a way to be fault tolerant of read failures when 
deserializing the key serializer
-               try (
-                       ByteArrayOutputStreamWithPos buffer = new 
ByteArrayOutputStreamWithPos();
-                       DataOutputViewStreamWrapper bufferWrapper = new 
DataOutputViewStreamWrapper(buffer)){
-
-                       new 
TypeSerializerSerializationProxy<>(keySerializer).write(bufferWrapper);
-
-                       // write offset of key serializer's configuration 
snapshot
-                       out.writeInt(buffer.getPosition());
-                       
TypeSerializerUtil.writeSerializerConfigSnapshot(bufferWrapper, 
keySerializerConfigSnapshot);
-
-                       // flush buffer
-                       out.writeInt(buffer.getPosition());
-                       out.write(buffer.getBuf(), 0, buffer.getPosition());
-               }
+               
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+                               out,
+                               Collections.singletonList(
+                                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(keySerializer, keySerializerConfigSnapshot)));
 
                // write individual registered keyed state metainfos
                out.writeShort(stateMetaInfoSnapshots.size());
@@ -118,38 +105,19 @@ public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritab
                }
        }
 
+       @SuppressWarnings("unchecked")
        @Override
        public void read(DataInputView in) throws IOException {
                super.read(in);
 
-               final TypeSerializerSerializationProxy<K> keySerializerProxy =
-                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
-
                // only starting from version 3, we have the key serializer and 
its config snapshot written
                if (getReadVersion() >= 3) {
-                       int keySerializerConfigSnapshotOffset = in.readInt();
-                       int numBufferedBytes = in.readInt();
-                       byte[] keySerializerAndConfigBytes = new 
byte[numBufferedBytes];
-                       in.readFully(keySerializerAndConfigBytes);
-
-                       try (
-                               ByteArrayInputStreamWithPos buffer = new 
ByteArrayInputStreamWithPos(keySerializerAndConfigBytes);
-                               DataInputViewStreamWrapper bufferWrapper = new 
DataInputViewStreamWrapper(buffer)) {
-
-                               try {
-                                       keySerializerProxy.read(bufferWrapper);
-                                       this.keySerializer = 
keySerializerProxy.getTypeSerializer();
-                               } catch (IOException e) {
-                                       this.keySerializer = null;
-                               }
-
-                               
buffer.setPosition(keySerializerConfigSnapshotOffset);
-                               this.keySerializerConfigSnapshot =
-                                       
TypeSerializerUtil.readSerializerConfigSnapshot(bufferWrapper, 
userCodeClassLoader);
-                       }
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
keySerializerAndConfig =
+                                       
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader).get(0);
+                       this.keySerializer = (TypeSerializer<K>) 
keySerializerAndConfig.f0;
+                       this.keySerializerConfigSnapshot = 
keySerializerAndConfig.f1;
                } else {
-                       keySerializerProxy.read(in);
-                       this.keySerializer = 
keySerializerProxy.getTypeSerializer();
+                       this.keySerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader);
                        this.keySerializerConfigSnapshot = null;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index ac81e86..9108306 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -19,19 +19,17 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 /**
  * Readers and writers for different versions of the {@link 
RegisteredKeyedBackendStateMetaInfo.Snapshot}.
@@ -39,8 +37,6 @@ import java.io.IOException;
  */
 public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(KeyedBackendStateMetaInfoSnapshotReaderWriters.class);
-
        // 
-------------------------------------------------------------------------------
        //  Writers
        //   - v1: Flink 1.2.x
@@ -91,8 +87,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
                        out.writeInt(stateMetaInfo.getStateType().ordinal());
                        out.writeUTF(stateMetaInfo.getName());
 
-                       new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(out);
-                       new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(out);
+                       TypeSerializerSerializationUtil.writeSerializer(out, 
stateMetaInfo.getNamespaceSerializer());
+                       TypeSerializerSerializationUtil.writeSerializer(out, 
stateMetaInfo.getStateSerializer());
                }
        }
 
@@ -108,25 +104,13 @@ public class 
KeyedBackendStateMetaInfoSnapshotReaderWriters {
                        out.writeUTF(stateMetaInfo.getName());
 
                        // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
-                       try (
-                               ByteArrayOutputStreamWithPos outWithPos = new 
ByteArrayOutputStreamWithPos();
-                               DataOutputViewStreamWrapper outViewWrapper = 
new DataOutputViewStreamWrapper(outWithPos)) {
-
-                               new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(outViewWrapper);
-
-                               // write current offset, which represents the 
start offset of the state serializer
-                               out.writeInt(outWithPos.getPosition());
-                               new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(outViewWrapper);
-
-                               // write current offset, which represents the 
start of the configuration snapshots
-                               out.writeInt(outWithPos.getPosition());
-                               
TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, 
stateMetaInfo.getNamespaceSerializerConfigSnapshot());
-                               
TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, 
stateMetaInfo.getStateSerializerConfigSnapshot());
-
-                               // write total number of bytes and then flush
-                               out.writeInt(outWithPos.getPosition());
-                               out.write(outWithPos.getBuf(), 0, 
outWithPos.getPosition());
-                       }
+                       
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+                               out,
+                               Arrays.asList(
+                                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                                               
stateMetaInfo.getNamespaceSerializer(), 
stateMetaInfo.getNamespaceSerializerConfigSnapshot()),
+                                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                                               
stateMetaInfo.getStateSerializer(), 
stateMetaInfo.getStateSerializerConfigSnapshot())));
                }
        }
 
@@ -184,15 +168,8 @@ public class 
KeyedBackendStateMetaInfoSnapshotReaderWriters {
                        
metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
                        metaInfo.setName(in.readUTF());
 
-                       final TypeSerializerSerializationProxy<N> 
namespaceSerializerProxy =
-                               new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
-                       namespaceSerializerProxy.read(in);
-                       
metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
-
-                       final TypeSerializerSerializationProxy<S> 
stateSerializerProxy =
-                               new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
-                       stateSerializerProxy.read(in);
-                       
metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
+                       
metaInfo.setNamespaceSerializer(TypeSerializerSerializationUtil.<N>tryReadSerializer(in,
 userCodeClassLoader));
+                       
metaInfo.setStateSerializer(TypeSerializerSerializationUtil.<S>tryReadSerializer(in,
 userCodeClassLoader));
 
                        // older versions do not contain the configuration 
snapshot
                        metaInfo.setNamespaceSerializerConfigSnapshot(null);
@@ -202,6 +179,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters 
{
                }
        }
 
+       @SuppressWarnings("unchecked")
        static class KeyedBackendStateMetaInfoReaderV3<N, S> extends 
AbstractKeyedBackendStateMetaInfoReader {
 
                public KeyedBackendStateMetaInfoReaderV3(ClassLoader 
userCodeClassLoader) {
@@ -216,48 +194,14 @@ public class 
KeyedBackendStateMetaInfoSnapshotReaderWriters {
                        
metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
                        metaInfo.setName(in.readUTF());
 
-                       // read offsets
-                       int stateSerializerStartOffset = in.readInt();
-                       int configSnapshotsStartOffset = in.readInt();
-
-                       int totalBytes = in.readInt();
-
-                       byte[] buffer = new byte[totalBytes];
-                       in.readFully(buffer);
-
-                       ByteArrayInputStreamWithPos inWithPos = new 
ByteArrayInputStreamWithPos(buffer);
-                       DataInputViewStreamWrapper inViewWrapper = new 
DataInputViewStreamWrapper(inWithPos);
-
-                       try {
-                               final TypeSerializerSerializationProxy<N> 
namespaceSerializerProxy =
-                                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
-                               namespaceSerializerProxy.read(inViewWrapper);
-                               
metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
-                       } catch (IOException e) {
-                               LOG.warn("Deserialization of previous namespace 
serializer errored; setting serializer to null. ", e);
-
-                               metaInfo.setNamespaceSerializer(null);
-                       }
-
-                       // make sure we start from the state serializer bytes 
position
-                       inWithPos.setPosition(stateSerializerStartOffset);
-                       try {
-                               final TypeSerializerSerializationProxy<S> 
stateSerializerProxy =
-                                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
-                               stateSerializerProxy.read(inViewWrapper);
-                               
metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
-                       } catch (IOException e) {
-                               LOG.warn("Deserialization of previous state 
serializer errored; setting serializer to null. ", e);
-
-                               metaInfo.setStateSerializer(null);
-                       }
-
-                       // make sure we start from the config snapshot bytes 
position
-                       inWithPos.setPosition(configSnapshotsStartOffset);
-                       metaInfo.setNamespaceSerializerConfigSnapshot(
-                               
TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, 
userCodeClassLoader));
-                       metaInfo.setStateSerializerConfigSnapshot(
-                               
TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, 
userCodeClassLoader));
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersAndConfigs =
+                               
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader);
+
+                       metaInfo.setNamespaceSerializer((TypeSerializer<N>) 
serializersAndConfigs.get(0).f0);
+                       
metaInfo.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(0).f1);
+
+                       metaInfo.setStateSerializer((TypeSerializer<S>) 
serializersAndConfigs.get(1).f0);
+                       
metaInfo.setStateSerializerConfigSnapshot(serializersAndConfigs.get(1).f1);
 
                        return metaInfo;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index 4f151c9..e52323f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -19,21 +19,19 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
 
 /**
  * Readers and writers for different versions of the {@link 
RegisteredOperatorBackendStateMetaInfo.Snapshot}.
@@ -91,7 +89,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                public void writeStateMetaInfo(DataOutputView out) throws 
IOException {
                        out.writeUTF(stateMetaInfo.getName());
                        
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
-                       new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(out);
+                       TypeSerializerSerializationUtil.writeSerializer(out, 
stateMetaInfo.getPartitionStateSerializer());
                }
        }
 
@@ -107,22 +105,11 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                        
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
 
                        // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
-                       try (
-                               ByteArrayOutputStreamWithPos outWithPos = new 
ByteArrayOutputStreamWithPos();
-                               DataOutputViewStreamWrapper outViewWrapper = 
new DataOutputViewStreamWrapper(outWithPos)) {
-
-                               new 
TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(outViewWrapper);
-
-                               // write the start offset of the config snapshot
-                               out.writeInt(outWithPos.getPosition());
-                               
TypeSerializerUtil.writeSerializerConfigSnapshot(
-                                       outViewWrapper,
-                                       
stateMetaInfo.getPartitionStateSerializerConfigSnapshot());
-
-                               // write the total number of bytes and flush
-                               out.writeInt(outWithPos.getPosition());
-                               out.write(outWithPos.getBuf(), 0, 
outWithPos.getPosition());
-                       }
+                       
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+                               out,
+                               Collections.singletonList(new 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+                                       
stateMetaInfo.getPartitionStateSerializer(),
+                                       
stateMetaInfo.getPartitionStateSerializerConfigSnapshot())));
                }
        }
 
@@ -192,6 +179,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
        }
 
+       @SuppressWarnings("unchecked")
        public static class OperatorBackendStateMetaInfoReaderV2<S> extends 
AbstractOperatorBackendStateMetaInfoReader<S> {
 
                public OperatorBackendStateMetaInfoReaderV2(ClassLoader 
userCodeClassLoader) {
@@ -206,32 +194,11 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                        stateMetaInfo.setName(in.readUTF());
                        
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
 
-                       // read start offset of configuration snapshot
-                       int configSnapshotStartOffset = in.readInt();
-
-                       int totalBytes = in.readInt();
-
-                       byte[] buffer = new byte[totalBytes];
-                       in.readFully(buffer);
-
-                       ByteArrayInputStreamWithPos inWithPos = new 
ByteArrayInputStreamWithPos(buffer);
-                       DataInputViewStreamWrapper inViewWrapper = new 
DataInputViewStreamWrapper(inWithPos);
-
-                       try {
-                               final TypeSerializerSerializationProxy<S> 
partitionStateSerializerProxy =
-                                       new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
-                               
partitionStateSerializerProxy.read(inViewWrapper);
-                               
stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer());
-                       } catch (IOException e) {
-                               LOG.warn("Deserialization of previous 
serializer errored; setting serializer to null. ", e);
-
-                               stateMetaInfo.setPartitionStateSerializer(null);
-                       }
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
stateSerializerAndConfig =
+                               
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader).get(0);
 
-                       // make sure we start from the partition state 
serializer bytes position
-                       inWithPos.setPosition(configSnapshotStartOffset);
-                       stateMetaInfo.setPartitionStateSerializerConfigSnapshot(
-                               
TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, 
userCodeClassLoader));
+                       
stateMetaInfo.setPartitionStateSerializer((TypeSerializer<S>) 
stateSerializerAndConfig.f0);
+                       
stateMetaInfo.setPartitionStateSerializerConfigSnapshot(stateSerializerAndConfig.f1);
 
                        return stateMetaInfo;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
deleted file mode 100644
index 39bb743..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-
-/**
- * Utilities related to state migration, commonly used in the state backends.
- */
-public class StateMigrationUtil {
-
-       /**
-        * Resolves the final compatibility result of two serializers by taking 
into account compound information,
-        * including the preceding serializer, the preceding serializer's 
configuration snapshot, and the new serializer.
-        *
-        * The final result is determined as follows:
-        *   1. If there is no configuration snapshot of the preceding 
serializer,
-        *      assumes the new serializer to be compatible.
-        *   2. Confront the configuration snapshot with the new serializer.
-        *   3. If the result is compatible, just return that as the result.
-        *   4. If not compatible and requires migration, check if the 
preceding serializer is valid.
-        *      If yes, use that as the convert deserializer for state 
migration.
-        *   5. If the preceding serializer is not valid, check if the result 
came with a convert deserializer.
-        *      If yes, use that for state migration and simply return the 
result.
-        *   6. If all of above fails, state migration is required but could 
not be performed; throw exception.
-        *
-        * @param precedingSerializer the preceding serializer used to write 
the data
-        * @param dummySerializerClassTag any class tags that identifies the 
preceding serializer as a dummy placeholder
-        * @param precedingSerializerConfigSnapshot configuration snapshot of 
the preceding serializer
-        * @param newSerializer the new serializer to ensure compatibility with
-        *
-        * @param <T> Type of the data handled by the serializers
-        *
-        * @return the final resolved compatibility result
-        */
-       public static <T> CompatibilityResult<T> resolveCompatibilityResult(
-                       TypeSerializer<T> precedingSerializer,
-                       Class<?> dummySerializerClassTag,
-                       TypeSerializerConfigSnapshot 
precedingSerializerConfigSnapshot,
-                       TypeSerializer<T> newSerializer) {
-
-               if (precedingSerializerConfigSnapshot != null) {
-                       CompatibilityResult<T> initialResult = 
newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
-
-                       if (!initialResult.isRequiresMigration()) {
-                               return initialResult;
-                       } else {
-                               if (precedingSerializer != null && 
!(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
-                                       // if the preceding serializer exists 
and is not a dummy, use
-                                       // that for converting instead of the 
provided convert deserializer
-                                       return 
CompatibilityResult.requiresMigration(precedingSerializer);
-                               } else if 
(initialResult.getConvertDeserializer() != null) {
-                                       return initialResult;
-                               } else {
-                                       throw new RuntimeException(
-                                               "State migration required, but 
there is no available serializer capable of reading previous data.");
-                               }
-                       }
-               } else {
-                       // if the configuration snapshot of the preceding 
serializer cannot be provided,
-                       // we can only simply assume that the new serializer is 
compatible
-                       return CompatibilityResult.compatible();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 3e5645b..ada6377 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -29,8 +29,9 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -54,7 +55,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -391,11 +391,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                if (!keySerializerRestored) {
                                        // check for key serializer 
compatibility; this also reconfigures the
                                        // key serializer to be compatible, if 
it is required and is possible
-                                       if 
(StateMigrationUtil.resolveCompatibilityResult(
-                                               
serializationProxy.getKeySerializer(),
-                                               
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
-                                               
serializationProxy.getKeySerializerConfigSnapshot(),
-                                               keySerializer)
+                                       if 
(CompatibilityUtil.resolveCompatibilityResult(
+                                                       
serializationProxy.getKeySerializer(),
+                                                       
UnloadableDummyTypeSerializer.class,
+                                                       
serializationProxy.getKeySerializerConfigSnapshot(),
+                                                       keySerializer)
                                                .isRequiresMigration()) {
 
                                                // TODO replace with state 
migration; note that key hash codes need to remain the same after migration
@@ -412,8 +412,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : 
restoredMetaInfos) {
 
                                        if 
(restoredMetaInfo.getStateSerializer() == null ||
-                                                       
restoredMetaInfo.getStateSerializer()
-                                                               instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+                                                       
restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer) 
{
 
                                                // must fail now if the 
previous serializer cannot be restored because there is no serializer
                                                // capable of reading previous 
state

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index fee97f4..f1f0406 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -61,7 +61,7 @@ import static org.mockito.Mockito.when;
  * Tests for the {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, 
OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
+@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBackend> {
 
        @Override
@@ -268,9 +268,10 @@ public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBack
                                "testOperator");
 
                        // mock failure when deserializing serializer
-                       TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+                       
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                                       
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
                        doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-                       
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+                       
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
                        
operatorStateBackend.restore(Collections.singletonList(stateHandle));
 
@@ -320,9 +321,10 @@ public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBack
                
when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
 
                // mock failure when deserializing serializer
-               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                               
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
                doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+               
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
                try {
                        restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index af5f0b2..31b75c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -64,7 +64,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(OperatorBackendStateMetaInfoSnapshotReaderWriters.class)
+@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class OperatorStateBackendTest {
 
        private final ClassLoader classLoader = getClass().getClassLoader();
@@ -544,9 +544,10 @@ public class OperatorStateBackendTest {
                                "testOperator");
 
                        // mock failure when deserializing serializer
-                       TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+                       
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                                       
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
                        doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-                       
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+                       
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
                        
operatorStateBackend.restore(Collections.singletonList(stateHandle));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 3d5b210..920aa69 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -44,10 +44,7 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({
-       KeyedBackendSerializationProxy.class,
-       KeyedBackendStateMetaInfoSnapshotReaderWriters.class,
-       OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
+@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class SerializationProxiesTest {
 
        @Test
@@ -116,9 +113,10 @@ public class SerializationProxiesTest {
                        new 
KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
                // mock failure when deserializing serializers
-               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                               
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
                doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+               
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
@@ -182,9 +180,10 @@ public class SerializationProxiesTest {
                }
 
                // mock failure when deserializing serializer
-               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                               
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
                doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+               
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
                        metaInfo = 
KeyedBackendStateMetaInfoSnapshotReaderWriters
@@ -279,9 +278,10 @@ public class SerializationProxiesTest {
                }
 
                // mock failure when deserializing serializer
-               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+                               
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
                doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+               
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
                        metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index 9736e81..9c45276 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -23,7 +23,9 @@ import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.java.typeutils._
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
 import org.apache.flink.api.scala.typeutils._
 import org.apache.flink.types.Value
 import org.apache.hadoop.io.Writable
@@ -152,6 +154,15 @@ private[flink] trait TypeInformationGen[C <: Context] {
             override def createInstance(fields: Array[AnyRef]): T = {
               instance.splice
             }
+
+            override def createSerializerInstance(
+                tupleClass: Class[T],
+                fieldSerializers: Array[TypeSerializer[_]]) = {
+              this.getClass
+                .getConstructors()(0)
+                .newInstance(tupleClass, fieldSerializers)
+                .asInstanceOf[CaseClassSerializer[T]]
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index 6096388..1899b13 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -106,11 +106,23 @@ package object scala {
           fieldSerializers(i) = types(i).createSerializer(executionConfig)
         }
 
-        new CaseClassSerializer[(T1, T2)](classOf[(T1, T2)], fieldSerializers) 
{
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[T1], fields(1).asInstanceOf[T2])
-          }
-        }
+        new Tuple2CaseClassSerializer[T1, T2](classOf[(T1, T2)], 
fieldSerializers)
       }
     }
+
+  class Tuple2CaseClassSerializer[T1, T2](
+      val clazz: Class[(T1, T2)],
+      fieldSerializers: Array[TypeSerializer[_]])
+    extends CaseClassSerializer[(T1, T2)](clazz, fieldSerializers) {
+
+    override def createInstance(fields: Array[AnyRef]) = {
+      (fields(0).asInstanceOf[T1], fields(1).asInstanceOf[T2])
+    }
+
+    override def createSerializerInstance(
+        tupleClass: Class[(T1, T2)],
+        fieldSerializers: Array[TypeSerializer[_]]) = {
+      new Tuple2CaseClassSerializer[T1, T2](tupleClass, fieldSerializers)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 6c4378a..1095aee 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeDeserializerAdapter, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.api.common.typeutils._
 import 
org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
@@ -110,22 +110,29 @@ class EitherSerializer[A, B, T <: Either[A, B]](
   // Serializer configuration snapshotting & compatibility
   // 
--------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): EitherSerializerConfigSnapshot = {
-    new EitherSerializerConfigSnapshot(
-      leftSerializer.snapshotConfiguration(),
-      rightSerializer.snapshotConfiguration())
+  override def snapshotConfiguration(): EitherSerializerConfigSnapshot[A, B] = 
{
+    new EitherSerializerConfigSnapshot[A, B](leftSerializer, rightSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
 
     configSnapshot match {
-      case eitherSerializerConfig: EitherSerializerConfigSnapshot =>
-        val leftRightConfigs =
-          eitherSerializerConfig.getNestedSerializerConfigSnapshots
-
-        val leftCompatResult = 
leftSerializer.ensureCompatibility(leftRightConfigs(0))
-        val rightCompatResult = 
rightSerializer.ensureCompatibility(leftRightConfigs(1))
+      case eitherSerializerConfig: EitherSerializerConfigSnapshot[A, B] =>
+        val previousLeftRightSerWithConfigs =
+          eitherSerializerConfig.getNestedSerializersAndConfigs
+
+        val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+          previousLeftRightSerWithConfigs.get(0).f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          previousLeftRightSerWithConfigs.get(0).f1,
+          leftSerializer)
+
+        val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+          previousLeftRightSerWithConfigs.get(1).f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          previousLeftRightSerWithConfigs.get(1).f1,
+          rightSerializer)
 
         if (leftCompatResult.isRequiresMigration
             || rightCompatResult.isRequiresMigration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 4b56059..8adfb5c 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -100,16 +100,20 @@ class OptionSerializer[A](val elemSerializer: 
TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // 
--------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): 
OptionSerializer.OptionSerializerConfigSnapshot = {
-    new 
OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer.snapshotConfiguration())
+  override def snapshotConfiguration(): 
OptionSerializer.OptionSerializerConfigSnapshot[A] = {
+    new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): 
CompatibilityResult[Option[A]] = {
     configSnapshot match {
-      case optionSerializerConfigSnapshot: 
OptionSerializer.OptionSerializerConfigSnapshot =>
-        val compatResult = elemSerializer.ensureCompatibility(
-          
optionSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
+      case optionSerializerConfigSnapshot
+          : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
+        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
+          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+          elemSerializer)
 
         if (compatResult.isRequiresMigration) {
           if (compatResult.getConvertDeserializer != null) {
@@ -130,9 +134,9 @@ class OptionSerializer[A](val elemSerializer: 
TypeSerializer[A])
 
 object OptionSerializer {
 
-  class OptionSerializerConfigSnapshot(
-      private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot)
-    extends 
CompositeTypeSerializerConfigSnapshot(elemSerializerConfigSnapshot) {
+  class OptionSerializerConfigSnapshot[A](
+      private val elemSerializer: TypeSerializer[A])
+    extends CompositeTypeSerializerConfigSnapshot(elemSerializer) {
 
     /** This empty nullary constructor is required for deserializing the 
configuration. */
     def this() = this(null)

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index 5de76ca..641caa1 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils._
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
-import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.KryoSerializerConfigSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 import scala.util.{Failure, Success, Try}
@@ -105,23 +104,28 @@ class TrySerializer[A](
   // 
--------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new TrySerializer.TrySerializerConfigSnapshot(
-        elemSerializer.snapshotConfiguration(),
-        throwableSerializer.snapshotConfiguration())
+    new TrySerializer.TrySerializerConfigSnapshot[A](elemSerializer, 
throwableSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): 
CompatibilityResult[Try[A]] = {
 
     configSnapshot match {
-      case trySerializerConfigSnapshot: 
TrySerializer.TrySerializerConfigSnapshot =>
-        val serializerConfigSnapshots =
-          trySerializerConfigSnapshot.getNestedSerializerConfigSnapshots
-
-        val elemCompatRes =
-          elemSerializer.ensureCompatibility(serializerConfigSnapshots(0))
-        val throwableCompatRes =
-          throwableSerializer.ensureCompatibility(serializerConfigSnapshots(1))
+      case trySerializerConfigSnapshot: 
TrySerializer.TrySerializerConfigSnapshot[A] =>
+        val previousSerializersAndConfigs =
+          trySerializerConfigSnapshot.getNestedSerializersAndConfigs
+
+        val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+          previousSerializersAndConfigs.get(0).f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          previousSerializersAndConfigs.get(0).f1,
+          elemSerializer)
+
+        val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+          previousSerializersAndConfigs.get(1).f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          previousSerializersAndConfigs.get(1).f1,
+          throwableSerializer)
 
         if (elemCompatRes.isRequiresMigration || 
throwableCompatRes.isRequiresMigration) {
           CompatibilityResult.requiresMigration()
@@ -136,11 +140,11 @@ class TrySerializer[A](
 
 object TrySerializer {
 
-  class TrySerializerConfigSnapshot(
-      private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot,
-      private var throwableSerializerConfigSnapshot: 
KryoSerializerConfigSnapshot[Throwable])
+  class TrySerializerConfigSnapshot[A](
+      private var elemSerializer: TypeSerializer[A],
+      private var throwableSerializer: TypeSerializer[Throwable])
     extends CompositeTypeSerializerConfigSnapshot(
-      elemSerializerConfigSnapshot, throwableSerializerConfigSnapshot) {
+      elemSerializer, throwableSerializer) {
 
     /** This empty nullary constructor is required for deserializing the 
configuration. */
     def this() = this(null, null)

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index dc23b8d..81ba33a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -23,10 +23,13 @@ import static java.util.Objects.requireNonNull;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -216,14 +219,20 @@ public class MultiplexingStreamRecordSerializer<T> 
extends TypeSerializer<Stream
 
        @Override
        public MultiplexingStreamRecordSerializerConfigSnapshot 
snapshotConfiguration() {
-               return new 
MultiplexingStreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+               return new 
MultiplexingStreamRecordSerializerConfigSnapshot<>(typeSerializer);
        }
 
        @Override
        public CompatibilityResult<StreamElement> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof 
MultiplexingStreamRecordSerializerConfigSnapshot) {
-                       CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
-                               
((MultiplexingStreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
previousTypeSerializerAndConfig =
+                               
((MultiplexingStreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
+
+                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       previousTypeSerializerAndConfig.f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousTypeSerializerAndConfig.f1,
+                                       typeSerializer);
 
                        if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
@@ -240,7 +249,7 @@ public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<Stream
        /**
         * Configuration snapshot specific to the {@link 
MultiplexingStreamRecordSerializer}.
         */
-       public static final class 
MultiplexingStreamRecordSerializerConfigSnapshot
+       public static final class 
MultiplexingStreamRecordSerializerConfigSnapshot<T>
                        extends CompositeTypeSerializerConfigSnapshot {
 
                private static final int VERSION = 1;
@@ -248,8 +257,8 @@ public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<Stream
                /** This empty nullary constructor is required for 
deserializing the configuration. */
                public MultiplexingStreamRecordSerializerConfigSnapshot() {}
 
-               public 
MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
typeSerializerConfigSnapshot) {
-                       super(typeSerializerConfigSnapshot);
+               public 
MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializer<T> 
typeSerializer) {
+                       super(typeSerializer);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 7b0390d..5c32c19 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -21,10 +21,13 @@ package 
org.apache.flink.migration.streaming.runtime.streamrecord;
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -155,14 +158,20 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
 
        @Override
        public StreamRecordSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
StreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+               return new 
StreamRecordSerializerConfigSnapshot<>(typeSerializer);
        }
 
        @Override
        public CompatibilityResult<StreamRecord<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof 
StreamRecordSerializerConfigSnapshot) {
-                       CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
-                               ((StreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
previousTypeSerializerAndConfig =
+                               ((StreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
+
+                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       previousTypeSerializerAndConfig.f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousTypeSerializerAndConfig.f1,
+                                       typeSerializer);
 
                        if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
@@ -179,15 +188,15 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
        /**
         * Configuration snapshot specific to the {@link 
StreamRecordSerializer}.
         */
-       public static final class StreamRecordSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+       public static final class StreamRecordSerializerConfigSnapshot<T> 
extends CompositeTypeSerializerConfigSnapshot {
 
                private static final int VERSION = 1;
 
                /** This empty nullary constructor is required for 
deserializing the configuration. */
                public StreamRecordSerializerConfigSnapshot() {}
 
-               public 
StreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
typeSerializerConfigSnapshot) {
-                       super(typeSerializerConfigSnapshot);
+               public StreamRecordSerializerConfigSnapshot(TypeSerializer<T> 
typeSerializer) {
+                       super(typeSerializer);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index ba69fed..390ac9d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -23,10 +23,13 @@ import static java.util.Objects.requireNonNull;
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -277,14 +280,20 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
 
        @Override
        public StreamElementSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
StreamElementSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+               return new 
StreamElementSerializerConfigSnapshot<>(typeSerializer);
        }
 
        @Override
        public CompatibilityResult<StreamElement> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof 
StreamElementSerializerConfigSnapshot) {
-                       CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
-                               ((StreamElementSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
previousTypeSerializerAndConfig =
+                               ((StreamElementSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
+
+                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       previousTypeSerializerAndConfig.f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousTypeSerializerAndConfig.f1,
+                                       typeSerializer);
 
                        if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();
@@ -301,15 +310,15 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
        /**
         * Configuration snapshot specific to the {@link 
StreamElementSerializer}.
         */
-       public static final class StreamElementSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+       public static final class StreamElementSerializerConfigSnapshot<T> 
extends CompositeTypeSerializerConfigSnapshot {
 
                private static final int VERSION = 1;
 
                /** This empty nullary constructor is required for 
deserializing the configuration. */
                public StreamElementSerializerConfigSnapshot() {}
 
-               public 
StreamElementSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
typeSerializerConfigSnapshot) {
-                       super(typeSerializerConfigSnapshot);
+               public StreamElementSerializerConfigSnapshot(TypeSerializer<T> 
typeSerializer) {
+                       super(typeSerializer);
                }
 
                @Override

Reply via email to