[FLINK-6482] [core] Add nested serializers to config snapshots of composite 
serializers

This commit adds also the nested serializers themselves to the
configuration snapshots of composite serializers. This opens up the
oppurtunity to use the previous nested serializer as the convert
deserializer in the case that a nested serializer in the new serializer
determines that state migration is required.

This commit also consolidate all TypeSerializer-related serialization
proxies into a single utility class.

This closes #3937.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7bc5de9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7bc5de9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7bc5de9

Branch: refs/heads/master
Commit: a7bc5de9b17dde793c6da0e7cb700004af117148
Parents: 5624c70
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Thu May 18 14:51:33 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Mon May 22 23:24:26 2017 +0800

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  19 +-
 .../api/common/typeutils/CompatibilityUtil.java |  83 ++++
 .../CompositeTypeSerializerConfigSnapshot.java  |  45 +-
 .../TypeSerializerSerializationProxy.java       | 244 ----------
 .../TypeSerializerSerializationUtil.java        | 446 +++++++++++++++++++
 .../common/typeutils/TypeSerializerUtil.java    | 203 ---------
 .../UnloadableDummyTypeSerializer.java          | 130 ++++++
 .../CollectionSerializerConfigSnapshot.java     |  10 +-
 .../typeutils/base/GenericArraySerializer.java  |  15 +-
 .../GenericArraySerializerConfigSnapshot.java   |   6 +-
 .../common/typeutils/base/ListSerializer.java   |  15 +-
 .../common/typeutils/base/MapSerializer.java    |  27 +-
 .../base/MapSerializerConfigSnapshot.java       |  11 +-
 .../typeutils/runtime/EitherSerializer.java     |  27 +-
 .../runtime/EitherSerializerConfigSnapshot.java |  11 +-
 .../java/typeutils/runtime/PojoSerializer.java  | 397 ++++++++++++-----
 .../java/typeutils/runtime/RowSerializer.java   |  29 +-
 .../java/typeutils/runtime/TupleSerializer.java |   5 +
 .../typeutils/runtime/TupleSerializerBase.java  |  52 ++-
 .../runtime/TupleSerializerConfigSnapshot.java  |   9 +-
 .../common/typeutils/SerializerTestBase.java    |   4 +-
 .../TypeSerializerConfigSnapshotTest.java       | 147 ------
 .../TypeSerializerSerializationProxyTest.java   | 142 ------
 .../TypeSerializerSerializationUtilTest.java    | 295 ++++++++++++
 .../typeutils/base/EnumSerializerTest.java      |   6 +-
 .../typeutils/runtime/PojoSerializerTest.java   | 146 +++++-
 .../kryo/KryoSerializerCompatibilityTest.java   |  10 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  41 +-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  48 +-
 .../AbstractKeyedCEPPatternOperator.java        |  15 +-
 .../table/runtime/types/CRowSerializer.scala    |  14 +-
 .../runtime/state/ArrayListSerializer.java      |  15 +-
 .../state/DefaultOperatorStateBackend.java      |   5 +-
 .../flink/runtime/state/HashMapSerializer.java  |  27 +-
 .../state/KeyedBackendSerializationProxy.java   |  58 +--
 ...ckendStateMetaInfoSnapshotReaderWriters.java | 108 ++---
 ...ckendStateMetaInfoSnapshotReaderWriters.java |  63 +--
 .../flink/runtime/state/StateMigrationUtil.java |  83 ----
 .../state/heap/HeapKeyedStateBackend.java       |  17 +-
 .../runtime/state/MemoryStateBackendTest.java   |  14 +-
 .../runtime/state/OperatorStateBackendTest.java |   9 +-
 .../runtime/state/SerializationProxiesTest.java |  22 +-
 .../api/scala/codegen/TypeInformationGen.scala  |  11 +
 .../org/apache/flink/api/scala/package.scala    |  22 +-
 .../api/scala/typeutils/EitherSerializer.scala  |  29 +-
 .../api/scala/typeutils/OptionSerializer.scala  |  20 +-
 .../api/scala/typeutils/TrySerializer.scala     |  36 +-
 .../MultiplexingStreamRecordSerializer.java     |  21 +-
 .../streamrecord/StreamRecordSerializer.java    |  21 +-
 .../streamrecord/StreamElementSerializer.java   |  21 +-
 50 files changed, 1882 insertions(+), 1372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 1f32a89..51255ab 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -53,6 +53,7 @@ import 
org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -64,7 +65,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -1111,9 +1111,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        // check for key serializer compatibility; this also 
reconfigures the
                        // key serializer to be compatible, if it is required 
and is possible
-                       if (StateMigrationUtil.resolveCompatibilityResult(
+                       if (CompatibilityUtil.resolveCompatibilityResult(
                                        serializationProxy.getKeySerializer(),
-                                       
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+                                       UnloadableDummyTypeSerializer.class,
                                        
serializationProxy.getKeySerializerConfigSnapshot(),
                                        rocksDBKeyedStateBackend.keySerializer)
                                .isRequiresMigration()) {
@@ -1230,9 +1230,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                // check for key serializer compatibility; this 
also reconfigures the
                                // key serializer to be compatible, if it is 
required and is possible
-                               if 
(StateMigrationUtil.resolveCompatibilityResult(
+                               if 
(CompatibilityUtil.resolveCompatibilityResult(
                                                
serializationProxy.getKeySerializer(),
-                                               
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+                                               
UnloadableDummyTypeSerializer.class,
                                                
serializationProxy.getKeySerializerConfigSnapshot(),
                                                stateBackend.keySerializer)
                                        .isRequiresMigration()) {
@@ -1532,16 +1532,15 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
 
                        // check compatibility results to determine if state 
migration is required
-
-                       CompatibilityResult<?> namespaceCompatibility = 
StateMigrationUtil.resolveCompatibilityResult(
+                       CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
                                        
restoredMetaInfo.getNamespaceSerializer(),
                                        MigrationNamespaceSerializerProxy.class,
                                        
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
                                        newMetaInfo.getNamespaceSerializer());
 
-                       CompatibilityResult<S> stateCompatibility = 
StateMigrationUtil.resolveCompatibilityResult(
+                       CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
                                        restoredMetaInfo.getStateSerializer(),
-                                       
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+                                       UnloadableDummyTypeSerializer.class,
                                        
restoredMetaInfo.getStateSerializerConfigSnapshot(),
                                        newMetaInfo.getStateSerializer());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
new file mode 100644
index 0000000..8959628
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Utilities related to serializer compatibility.
+ */
+@Internal
+public class CompatibilityUtil {
+
+       /**
+        * Resolves the final compatibility result of two serializers by taking 
into account compound information,
+        * including the preceding serializer, the preceding serializer's 
configuration snapshot, and the new serializer.
+        *
+        * The final result is determined as follows:
+        *   1. If there is no configuration snapshot of the preceding 
serializer,
+        *      assumes the new serializer to be compatible.
+        *   2. Confront the configuration snapshot with the new serializer.
+        *   3. If the result is compatible, just return that as the result.
+        *   4. If not compatible and requires migration, check if the 
preceding serializer is valid.
+        *      If yes, use that as the convert deserializer for state 
migration.
+        *   5. If the preceding serializer is not valid, check if the result 
came with a convert deserializer.
+        *      If yes, use that for state migration and simply return the 
result.
+        *   6. If all of above fails, state migration is required but could 
not be performed; throw exception.
+        *
+        * @param precedingSerializer the preceding serializer used to write 
the data
+        * @param dummySerializerClassTag any class tags that identifies the 
preceding serializer as a dummy placeholder
+        * @param precedingSerializerConfigSnapshot configuration snapshot of 
the preceding serializer
+        * @param newSerializer the new serializer to ensure compatibility with
+        *
+        * @param <T> Type of the data handled by the serializers
+        * 
+        * @return the final resolved compatibility result
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+                       TypeSerializer<?> precedingSerializer,
+                       Class<?> dummySerializerClassTag,
+                       TypeSerializerConfigSnapshot 
precedingSerializerConfigSnapshot,
+                       TypeSerializer<T> newSerializer) {
+
+               if (precedingSerializerConfigSnapshot != null) {
+                       CompatibilityResult<T> initialResult = 
newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+
+                       if (!initialResult.isRequiresMigration()) {
+                               return initialResult;
+                       } else {
+                               if (precedingSerializer != null && 
!(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
+                                       // if the preceding serializer exists 
and is not a dummy, use
+                                       // that for converting instead of the 
provided convert deserializer
+                                       return 
CompatibilityResult.requiresMigration((TypeSerializer<T>) precedingSerializer);
+                               } else if 
(initialResult.getConvertDeserializer() != null) {
+                                       return initialResult;
+                               } else {
+                                       throw new RuntimeException(
+                                               "State migration required, but 
there is no available serializer capable of reading previous data.");
+                               }
+                       }
+               } else {
+                       // if the configuration snapshot of the preceding 
serializer cannot be provided,
+                       // we can only simply assume that the new serializer is 
compatible
+                       return CompatibilityResult.compatible();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
index e7e2650..45b78c1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
@@ -19,47 +19,64 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A {@link TypeSerializerConfigSnapshot} for serializers that has multiple 
nested serializers.
- * The configuration snapshot consists of the configuration snapshots of all 
nested serializers.
+ * The configuration snapshot consists of the configuration snapshots of all 
nested serializers, and
+ * also the nested serializers themselves.
+ *
+ * <p>Both the nested serializers and the configuration snapshots are written 
as configuration of
+ * composite serializers, so that on restore, the previous serializer may be 
used in case migration
+ * is required.
  */
 @Internal
 public abstract class CompositeTypeSerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot {
 
-       private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
+       private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
nestedSerializersAndConfigs;
 
        /** This empty nullary constructor is required for deserializing the 
configuration. */
        public CompositeTypeSerializerConfigSnapshot() {}
 
-       public 
CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... 
nestedSerializerConfigSnapshots) {
-               this.nestedSerializerConfigSnapshots = 
Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
+       public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... 
nestedSerializers) {
+               Preconditions.checkNotNull(nestedSerializers);
+
+               this.nestedSerializersAndConfigs = new 
ArrayList<>(nestedSerializers.length);
+               for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
+                       TypeSerializerConfigSnapshot configSnapshot = 
nestedSerializer.snapshotConfiguration();
+                       this.nestedSerializersAndConfigs.add(
+                               new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
+                                       nestedSerializer.duplicate(),
+                                       
Preconditions.checkNotNull(configSnapshot)));
+               }
        }
 
        @Override
        public void write(DataOutputView out) throws IOException {
                super.write(out);
-               TypeSerializerUtil.writeSerializerConfigSnapshots(out, 
nestedSerializerConfigSnapshots);
+               
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out, 
nestedSerializersAndConfigs);
        }
 
        @Override
        public void read(DataInputView in) throws IOException {
                super.read(in);
-               nestedSerializerConfigSnapshots = 
TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
+               this.nestedSerializersAndConfigs =
+                       
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
getUserCodeClassLoader());
        }
 
-       public TypeSerializerConfigSnapshot[] 
getNestedSerializerConfigSnapshots() {
-               return nestedSerializerConfigSnapshots;
+       public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
getNestedSerializersAndConfigs() {
+               return nestedSerializersAndConfigs;
        }
 
-       public TypeSerializerConfigSnapshot 
getSingleNestedSerializerConfigSnapshot() {
-               return nestedSerializerConfigSnapshots[0];
+       public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
getSingleNestedSerializerAndConfig() {
+               return nestedSerializersAndConfigs.get(0);
        }
 
        @Override
@@ -73,13 +90,11 @@ public abstract class CompositeTypeSerializerConfigSnapshot 
extends TypeSerializ
                }
 
                return (obj.getClass().equals(getClass()))
-                               && Arrays.equals(
-                                       nestedSerializerConfigSnapshots,
-                                       
((CompositeTypeSerializerConfigSnapshot) 
obj).getNestedSerializerConfigSnapshots());
+                               && 
nestedSerializersAndConfigs.equals(((CompositeTypeSerializerConfigSnapshot) 
obj).getNestedSerializersAndConfigs());
        }
 
        @Override
        public int hashCode() {
-               return Arrays.hashCode(nestedSerializerConfigSnapshots);
+               return nestedSerializersAndConfigs.hashCode();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
deleted file mode 100644
index 067a1ca..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.VersionedIOReadableWritable;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.util.Arrays;
-
-@Internal
-public class TypeSerializerSerializationProxy<T> extends 
VersionedIOReadableWritable {
-
-       public static final int VERSION = 1;
-       private static final Logger LOG = 
LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
-
-       private ClassLoader userClassLoader;
-
-       private TypeSerializer<T> typeSerializer;
-
-       private boolean ignoreClassNotFound;
-
-       public TypeSerializerSerializationProxy(ClassLoader userClassLoader, 
boolean ignoreClassNotFound) {
-               this.userClassLoader = userClassLoader;
-               this.ignoreClassNotFound = ignoreClassNotFound;
-       }
-
-       public TypeSerializerSerializationProxy(ClassLoader userClassLoader) {
-               this(userClassLoader, false);
-       }
-
-       public TypeSerializerSerializationProxy(TypeSerializer<T> 
typeSerializer) {
-               this.typeSerializer = 
Preconditions.checkNotNull(typeSerializer);
-               this.ignoreClassNotFound = false;
-       }
-
-       public TypeSerializer<T> getTypeSerializer() {
-               return typeSerializer;
-       }
-
-       @Override
-       public int getVersion() {
-               return VERSION;
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               super.write(out);
-
-               if (typeSerializer instanceof ClassNotFoundDummyTypeSerializer) 
{
-                       ClassNotFoundDummyTypeSerializer<T> dummyTypeSerializer 
=
-                                       (ClassNotFoundDummyTypeSerializer<T>) 
this.typeSerializer;
-
-                       byte[] serializerBytes = 
dummyTypeSerializer.getActualBytes();
-                       out.write(serializerBytes.length);
-                       out.write(serializerBytes);
-               } else {
-                       // write in a way that allows the stream to recover 
from exceptions
-                       try (ByteArrayOutputStreamWithPos streamWithPos = new 
ByteArrayOutputStreamWithPos()) {
-                               
InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
-                               out.writeInt(streamWithPos.getPosition());
-                               out.write(streamWithPos.getBuf(), 0, 
streamWithPos.getPosition());
-                       }
-               }
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               super.read(in);
-
-               // read in a way that allows the stream to recover from 
exceptions
-               int serializerBytes = in.readInt();
-               byte[] buffer = new byte[serializerBytes];
-               in.readFully(buffer);
-               try {
-                       typeSerializer = 
InstantiationUtil.deserializeObject(buffer, userClassLoader);
-               } catch (ClassNotFoundException | InvalidClassException e) {
-                       if (ignoreClassNotFound) {
-                               // we create a dummy so that all the 
information is not lost when we get a new checkpoint before receiving
-                               // a proper typeserializer from the user
-                               typeSerializer =
-                                               new 
ClassNotFoundDummyTypeSerializer<>(buffer);
-                               LOG.warn("Could not find requested 
TypeSerializer class in classpath. Created dummy.", e);
-                       } else {
-                               throw new IOException("Missing class for type 
serializer.", e);
-                       }
-               }
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               TypeSerializerSerializationProxy<?> that = 
(TypeSerializerSerializationProxy<?>) o;
-
-               return getTypeSerializer() != null ? 
getTypeSerializer().equals(that.getTypeSerializer()) : that.getTypeSerializer() 
== null;
-       }
-
-       @Override
-       public int hashCode() {
-               return getTypeSerializer() != null ? 
getTypeSerializer().hashCode() : 0;
-       }
-
-       public boolean isIgnoreClassNotFound() {
-               return ignoreClassNotFound;
-       }
-
-       public void setIgnoreClassNotFound(boolean ignoreClassNotFound) {
-               this.ignoreClassNotFound = ignoreClassNotFound;
-       }
-
-       /**
-        * Dummy TypeSerializer to avoid that data is lost when checkpointing 
again a serializer for which we encountered
-        * a {@link ClassNotFoundException}.
-        */
-       public static final class ClassNotFoundDummyTypeSerializer<T> extends 
TypeSerializer<T> {
-
-               private static final long serialVersionUID = 
2526330533671642711L;
-               private final byte[] actualBytes;
-
-               public ClassNotFoundDummyTypeSerializer(byte[] actualBytes) {
-                       this.actualBytes = 
Preconditions.checkNotNull(actualBytes);
-               }
-
-               public byte[] getActualBytes() {
-                       return actualBytes;
-               }
-
-               @Override
-               public boolean isImmutableType() {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public TypeSerializer<T> duplicate() {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public T createInstance() {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public T copy(T from) {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public T copy(T from, T reuse) {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public int getLength() {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public void serialize(T record, DataOutputView target) throws 
IOException {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public T deserialize(DataInputView source) throws IOException {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public T deserialize(T reuse, DataInputView source) throws 
IOException {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public TypeSerializerConfigSnapshot snapshotConfiguration() {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
-               }
-
-               @Override
-               public boolean canEqual(Object obj) {
-                       return false;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-
-                       ClassNotFoundDummyTypeSerializer<?> that = 
(ClassNotFoundDummyTypeSerializer<?>) o;
-
-                       return Arrays.equals(getActualBytes(), 
that.getActualBytes());
-               }
-
-               @Override
-               public int hashCode() {
-                       return Arrays.hashCode(getActualBytes());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
new file mode 100644
index 0000000..3d79d9a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializer} and {@link 
TypeSerializerConfigSnapshot}.
+ */
+@Internal
+public class TypeSerializerSerializationUtil {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TypeSerializerSerializationUtil.class);
+
+       /**
+        * Writes a {@link TypeSerializer} to the provided data output view.
+        *
+        * <p>It is written with a format that can be later read again using
+        * {@link #tryReadSerializer(DataInputView, ClassLoader, boolean)}.
+        *
+        * @param out the data output view.
+        * @param serializer the serializer to write.
+        *
+        * @param <T> Data type of the serializer.
+        *
+        * @throws IOException
+        */
+       public static <T> void writeSerializer(DataOutputView out, 
TypeSerializer<T> serializer) throws IOException {
+               new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(serializer).write(out);
+       }
+
+       /**
+        * Reads from a data input view a {@link TypeSerializer} that was 
previously
+        * written using {@link #writeSerializer(DataOutputView, 
TypeSerializer)}.
+        *
+        * <p>If deserialization fails for any reason (corrupted serializer 
bytes, serializer class
+        * no longer in classpath, serializer class no longer valid, etc.), 
{@code null} will
+        * be returned instead.
+        *
+        * @param in the data input view.
+        * @param userCodeClassLoader the user code class loader to use.
+        *
+        * @param <T> Data type of the serializer.
+        *
+        * @return the deserialized serializer.
+        */
+       public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, 
ClassLoader userCodeClassLoader) {
+               return tryReadSerializer(in, userCodeClassLoader, false);
+       }
+
+       /**
+        * Reads from a data input view a {@link TypeSerializer} that was 
previously
+        * written using {@link #writeSerializer(DataOutputView, 
TypeSerializer)}.
+        *
+        * <p>If deserialization fails due to {@link ClassNotFoundException} or 
{@link InvalidClassException},
+        * users can opt to use a dummy {@link UnloadableDummyTypeSerializer} 
to hold the serializer bytes,
+        * otherwise {@code null} is returned. If the failure is due to a 
{@link java.io.StreamCorruptedException},
+        * then {@code null} is returned.
+        *
+        * @param in the data input view.
+        * @param userCodeClassLoader the user code class loader to use.
+        * @param useDummyPlaceholder whether or not to use a dummy {@link 
UnloadableDummyTypeSerializer} to hold the
+        *                            serializer bytes in the case of a {@link 
ClassNotFoundException} or
+        *                            {@link InvalidClassException}.
+        *
+        * @param <T> Data type of the serializer.
+        *
+        * @return the deserialized serializer.
+        */
+       public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, 
ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) {
+               final 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<T> proxy =
+                       new 
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader,
 useDummyPlaceholder);
+
+               try {
+                       proxy.read(in);
+                       return proxy.getTypeSerializer();
+               } catch (IOException e) {
+                       LOG.warn("Deserialization of serializer errored; 
replacing with null.", e);
+
+                       return null;
+               }
+       }
+
+       /**
+        * Write a list of serializers and their corresponding config snapshots 
to the provided
+        * data output view. This method writes in a fault tolerant way, so 
that when read again
+        * using {@link #readSerializersAndConfigsWithResilience(DataInputView, 
ClassLoader)}, if
+        * deserialization of the serializer fails, its configuration snapshot 
will remain intact.
+        *
+        * <p>Specifically, all written serializers and their config snapshots 
are indexed by their
+        * offset positions within the serialized bytes. The serialization 
format is as follows:
+        * <ul>
+        *     <li>1. number of serializer and configuration snapshot 
pairs.</li>
+        *     <li>2. offsets of each serializer and configuration snapshot, in 
order.</li>
+        *     <li>3. total number of bytes for the serialized serializers and 
the config snapshots.</li>
+        *     <li>4. serialized serializers and the config snapshots.</li>
+        * </ul>
+        *
+        * @param out the data output view.
+        * @param serializersAndConfigs serializer and configuration snapshot 
pairs
+        *
+        * @throws IOException
+        */
+       public static void writeSerializersAndConfigsWithResilience(
+                       DataOutputView out,
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersAndConfigs) throws IOException {
+
+               try (
+                       ByteArrayOutputStreamWithPos bufferWithPos = new 
ByteArrayOutputStreamWithPos();
+                       DataOutputViewStreamWrapper bufferWrapper = new 
DataOutputViewStreamWrapper(bufferWithPos)) {
+
+                       out.writeInt(serializersAndConfigs.size());
+                       for (Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> serAndConfSnapshot : serializersAndConfigs) {
+                               out.writeInt(bufferWithPos.getPosition());
+                               writeSerializer(bufferWrapper, 
serAndConfSnapshot.f0);
+
+                               out.writeInt(bufferWithPos.getPosition());
+                               writeSerializerConfigSnapshot(bufferWrapper, 
serAndConfSnapshot.f1);
+                       }
+
+                       out.writeInt(bufferWithPos.getPosition());
+                       out.write(bufferWithPos.getBuf(), 0, 
bufferWithPos.getPosition());
+               }
+       }
+
+       /**
+        * Reads from a data input view a list of serializers and their 
corresponding config snapshots
+        * written using {@link 
#writeSerializersAndConfigsWithResilience(DataOutputView, List)}.
+        * This is fault tolerant to any failures when deserializing the 
serializers. Serializers which
+        * were not successfully deserialized will be replaced by {@code null}.
+        *
+        * @param in the data input view.
+        * @param userCodeClassLoader the user code class loader to use.
+        *
+        * @return the deserialized serializer and config snapshot pairs.
+        *
+        * @throws IOException
+        */
+       public static List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> readSerializersAndConfigsWithResilience(
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader) throws IOException {
+
+               int numSerializersAndConfigSnapshots = in.readInt();
+
+               int[] offsets = new int[numSerializersAndConfigSnapshots * 2];
+
+               for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
+                       offsets[i * 2] = in.readInt();
+                       offsets[i * 2 + 1] = in.readInt();
+               }
+
+               int totalBytes = in.readInt();
+               byte[] buffer = new byte[totalBytes];
+               in.readFully(buffer);
+
+               List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
serializersAndConfigSnapshots =
+                       new ArrayList<>(numSerializersAndConfigSnapshots);
+
+               TypeSerializer<?> serializer;
+               TypeSerializerConfigSnapshot configSnapshot;
+               try (
+                       ByteArrayInputStreamWithPos bufferWithPos = new 
ByteArrayInputStreamWithPos(buffer);
+                       DataInputViewStreamWrapper bufferWrapper = new 
DataInputViewStreamWrapper(bufferWithPos)) {
+
+                       for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
+
+                               bufferWithPos.setPosition(offsets[i * 2]);
+                               serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+
+                               bufferWithPos.setPosition(offsets[i * 2 + 1]);
+                               configSnapshot = 
readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader);
+
+                               serializersAndConfigSnapshots.add(
+                                       new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(serializer, configSnapshot));
+                       }
+               }
+
+               return serializersAndConfigSnapshots;
+       }
+
+       /**
+        * Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+        *
+        * <p>It is written with a format that can be later read again using
+        * {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+        *
+        * @param out the data output view
+        * @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+        *
+        * @throws IOException
+        */
+       public static void writeSerializerConfigSnapshot(
+                       DataOutputView out,
+                       TypeSerializerConfigSnapshot serializerConfigSnapshot) 
throws IOException {
+
+               new 
TypeSerializerConfigSnapshotSerializationProxy(serializerConfigSnapshot).write(out);
+       }
+
+       /**
+        * Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+        * written using {@link #writeSerializerConfigSnapshot(DataOutputView, 
TypeSerializerConfigSnapshot)}.
+        *
+        * @param in the data input view
+        * @param userCodeClassLoader the user code class loader to use
+        *
+        * @return the read serializer configuration snapshot
+        *
+        * @throws IOException
+        */
+       public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader) throws IOException {
+
+               final TypeSerializerConfigSnapshotSerializationProxy proxy = 
new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+               proxy.read(in);
+
+               return proxy.getSerializerConfigSnapshot();
+       }
+
+       /**
+        * Writes multiple {@link TypeSerializerConfigSnapshot}s to the 
provided data output view.
+        *
+        * <p>It is written with a format that can be later read again using
+        * {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
+        *
+        * @param out the data output view
+        * @param serializerConfigSnapshots the serializer configuration 
snapshots to write
+        *
+        * @throws IOException
+        */
+       public static void writeSerializerConfigSnapshots(
+                       DataOutputView out,
+                       TypeSerializerConfigSnapshot... 
serializerConfigSnapshots) throws IOException {
+
+               out.writeInt(serializerConfigSnapshots.length);
+
+               for (TypeSerializerConfigSnapshot snapshot : 
serializerConfigSnapshots) {
+                       new 
TypeSerializerConfigSnapshotSerializationProxy(snapshot).write(out);
+               }
+       }
+
+       /**
+        * Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+        * written using {@link #writeSerializerConfigSnapshot(DataOutputView, 
TypeSerializerConfigSnapshot)}.
+        *
+        * @param in the data input view
+        * @param userCodeClassLoader the user code class loader to use
+        *
+        * @return the read serializer configuration snapshots
+        *
+        * @throws IOException
+        */
+       public static TypeSerializerConfigSnapshot[] 
readSerializerConfigSnapshots(
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader) throws IOException {
+
+               int numFields = in.readInt();
+               final TypeSerializerConfigSnapshot[] serializerConfigSnapshots 
= new TypeSerializerConfigSnapshot[numFields];
+
+               TypeSerializerConfigSnapshotSerializationProxy proxy;
+               for (int i = 0; i < numFields; i++) {
+                       proxy = new 
TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+                       proxy.read(in);
+                       serializerConfigSnapshots[i] = 
proxy.getSerializerConfigSnapshot();
+               }
+
+               return serializerConfigSnapshots;
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------
+
+       /**
+        * Utility serialization proxy for a {@link TypeSerializer}.
+        */
+       public static final class TypeSerializerSerializationProxy<T> extends 
VersionedIOReadableWritable {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
+
+               private static final int VERSION = 1;
+
+               private ClassLoader userClassLoader;
+               private TypeSerializer<T> typeSerializer;
+               private boolean useDummyPlaceholder;
+
+               public TypeSerializerSerializationProxy(ClassLoader 
userClassLoader, boolean useDummyPlaceholder) {
+                       this.userClassLoader = userClassLoader;
+                       this.useDummyPlaceholder = useDummyPlaceholder;
+               }
+
+               public TypeSerializerSerializationProxy(ClassLoader 
userClassLoader) {
+                       this(userClassLoader, false);
+               }
+
+               public TypeSerializerSerializationProxy(TypeSerializer<T> 
typeSerializer) {
+                       this.typeSerializer = 
Preconditions.checkNotNull(typeSerializer);
+                       this.useDummyPlaceholder = false;
+               }
+
+               public TypeSerializer<T> getTypeSerializer() {
+                       return typeSerializer;
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       if (typeSerializer instanceof 
UnloadableDummyTypeSerializer) {
+                               UnloadableDummyTypeSerializer<T> 
dummyTypeSerializer =
+                                       (UnloadableDummyTypeSerializer<T>) 
this.typeSerializer;
+
+                               byte[] serializerBytes = 
dummyTypeSerializer.getActualBytes();
+                               out.write(serializerBytes.length);
+                               out.write(serializerBytes);
+                       } else {
+                               // write in a way that allows the stream to 
recover from exceptions
+                               try (ByteArrayOutputStreamWithPos streamWithPos 
= new ByteArrayOutputStreamWithPos()) {
+                                       
InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
+                                       
out.writeInt(streamWithPos.getPosition());
+                                       out.write(streamWithPos.getBuf(), 0, 
streamWithPos.getPosition());
+                               }
+                       }
+               }
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       // read in a way that allows the stream to recover from 
exceptions
+                       int serializerBytes = in.readInt();
+                       byte[] buffer = new byte[serializerBytes];
+                       in.readFully(buffer);
+                       try {
+                               typeSerializer = 
InstantiationUtil.deserializeObject(buffer, userClassLoader);
+                       } catch (ClassNotFoundException | InvalidClassException 
e) {
+                               if (useDummyPlaceholder) {
+                                       // we create a dummy so that all the 
information is not lost when we get a new checkpoint before receiving
+                                       // a proper typeserializer from the user
+                                       typeSerializer =
+                                               new 
UnloadableDummyTypeSerializer<>(buffer);
+                                       LOG.warn("Could not find requested 
TypeSerializer class in classpath. Created dummy.", e);
+                               } else {
+                                       throw new IOException("Missing class 
for type serializer.", e);
+                               }
+                       }
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
+
+       /**
+        * Utility serialization proxy for a {@link 
TypeSerializerConfigSnapshot}.
+        */
+       static final class TypeSerializerConfigSnapshotSerializationProxy 
extends VersionedIOReadableWritable {
+
+               private static final int VERSION = 1;
+
+               private ClassLoader userCodeClassLoader;
+               private TypeSerializerConfigSnapshot serializerConfigSnapshot;
+
+               TypeSerializerConfigSnapshotSerializationProxy(ClassLoader 
userCodeClassLoader) {
+                       this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+               }
+
+               
TypeSerializerConfigSnapshotSerializationProxy(TypeSerializerConfigSnapshot 
serializerConfigSnapshot) {
+                       this.serializerConfigSnapshot = 
serializerConfigSnapshot;
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       // config snapshot class, so that we can re-instantiate 
the
+                       // correct type of config snapshot instance when 
deserializing
+                       
out.writeUTF(serializerConfigSnapshot.getClass().getName());
+
+                       // the actual configuration parameters
+                       serializerConfigSnapshot.write(out);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       String serializerConfigClassname = in.readUTF();
+                       Class<? extends TypeSerializerConfigSnapshot> 
serializerConfigSnapshotClass;
+                       try {
+                               serializerConfigSnapshotClass = (Class<? 
extends TypeSerializerConfigSnapshot>)
+                                       
Class.forName(serializerConfigClassname, true, userCodeClassLoader);
+                       } catch (ClassNotFoundException e) {
+                               throw new IOException(
+                                       "Could not find requested 
TypeSerializerConfigSnapshot class "
+                                               + serializerConfigClassname +  
" in classpath.", e);
+                       }
+
+                       serializerConfigSnapshot = 
InstantiationUtil.instantiate(serializerConfigSnapshotClass);
+                       
serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
+                       serializerConfigSnapshot.read(in);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               TypeSerializerConfigSnapshot getSerializerConfigSnapshot() {
+                       return serializerConfigSnapshot;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
deleted file mode 100644
index 0a2148a..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.VersionedIOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Utility methods for {@link TypeSerializer} and {@link 
TypeSerializerConfigSnapshot}.
- */
-@Internal
-public class TypeSerializerUtil {
-
-       /**
-        * Creates an array of {@link TypeSerializerConfigSnapshot}s taken
-        * from the provided array of {@link TypeSerializer}s.
-        *
-        * @param serializers array of type serializers.
-        *
-        * @return array of configuration snapshots taken from each serializer.
-        */
-       public static TypeSerializerConfigSnapshot[] 
snapshotConfigurations(TypeSerializer<?>[] serializers) {
-               final TypeSerializerConfigSnapshot[] configSnapshots = new 
TypeSerializerConfigSnapshot[serializers.length];
-
-               for (int i = 0; i < serializers.length; i++) {
-                       configSnapshots[i] = 
serializers[i].snapshotConfiguration();
-               }
-
-               return configSnapshots;
-       }
-
-       /**
-        * Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
-        *
-        * <p>It is written with a format that can be later read again using
-        * {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
-        *
-        * @param out the data output view
-        * @param serializerConfigSnapshot the serializer configuration 
snapshot to write
-        *
-        * @throws IOException
-        */
-       public static void writeSerializerConfigSnapshot(
-                       DataOutputView out,
-                       TypeSerializerConfigSnapshot serializerConfigSnapshot) 
throws IOException {
-
-               new 
TypeSerializerConfigSnapshotProxy(serializerConfigSnapshot).write(out);
-       }
-
-       /**
-        * Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
-        * written using {@link #writeSerializerConfigSnapshot(DataOutputView, 
TypeSerializerConfigSnapshot)}.
-        *
-        * @param in the data input view
-        * @param userCodeClassLoader the user code class loader to use
-        *
-        * @return the read serializer configuration snapshot
-        *
-        * @throws IOException
-        */
-       public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
-                       DataInputView in,
-                       ClassLoader userCodeClassLoader) throws IOException {
-
-               final TypeSerializerConfigSnapshotProxy proxy = new 
TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
-               proxy.read(in);
-
-               return proxy.getSerializerConfigSnapshot();
-       }
-
-       /**
-        * Writes multiple {@link TypeSerializerConfigSnapshot}s to the 
provided data output view.
-        *
-        * <p>It is written with a format that can be later read again using
-        * {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
-        *
-        * @param out the data output view
-        * @param serializerConfigSnapshots the serializer configuration 
snapshots to write
-        *
-        * @throws IOException
-        */
-       public static void writeSerializerConfigSnapshots(
-                       DataOutputView out,
-                       TypeSerializerConfigSnapshot... 
serializerConfigSnapshots) throws IOException {
-
-               out.writeInt(serializerConfigSnapshots.length);
-
-               for (TypeSerializerConfigSnapshot snapshot : 
serializerConfigSnapshots) {
-                       new 
TypeSerializerConfigSnapshotProxy(snapshot).write(out);
-               }
-       }
-
-       /**
-        * Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
-        * written using {@link #writeSerializerConfigSnapshot(DataOutputView, 
TypeSerializerConfigSnapshot)}.
-        *
-        * @param in the data input view
-        * @param userCodeClassLoader the user code class loader to use
-        *
-        * @return the read serializer configuration snapshots
-        *
-        * @throws IOException
-        */
-       public static TypeSerializerConfigSnapshot[] 
readSerializerConfigSnapshots(
-                       DataInputView in,
-                       ClassLoader userCodeClassLoader) throws IOException {
-
-               int numFields = in.readInt();
-               final TypeSerializerConfigSnapshot[] serializerConfigSnapshots 
= new TypeSerializerConfigSnapshot[numFields];
-
-               TypeSerializerConfigSnapshotProxy proxy;
-               for (int i = 0; i < numFields; i++) {
-                       proxy = new 
TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
-                       proxy.read(in);
-                       serializerConfigSnapshots[i] = 
proxy.getSerializerConfigSnapshot();
-               }
-
-               return serializerConfigSnapshots;
-       }
-
-       /**
-        * Utility serialization proxy for a {@link 
TypeSerializerConfigSnapshot}.
-        */
-       static class TypeSerializerConfigSnapshotProxy extends 
VersionedIOReadableWritable {
-
-               private static final int VERSION = 1;
-
-               private ClassLoader userCodeClassLoader;
-               private TypeSerializerConfigSnapshot serializerConfigSnapshot;
-
-               TypeSerializerConfigSnapshotProxy(ClassLoader 
userCodeClassLoader) {
-                       this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
-               }
-
-               TypeSerializerConfigSnapshotProxy(TypeSerializerConfigSnapshot 
serializerConfigSnapshot) {
-                       this.serializerConfigSnapshot = 
serializerConfigSnapshot;
-               }
-
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       super.write(out);
-
-                       // config snapshot class, so that we can re-instantiate 
the
-                       // correct type of config snapshot instance when 
deserializing
-                       
out.writeUTF(serializerConfigSnapshot.getClass().getName());
-
-                       // the actual configuration parameters
-                       serializerConfigSnapshot.write(out);
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       super.read(in);
-
-                       String serializerConfigClassname = in.readUTF();
-                       Class<? extends TypeSerializerConfigSnapshot> 
serializerConfigSnapshotClass;
-                       try {
-                               serializerConfigSnapshotClass = (Class<? 
extends TypeSerializerConfigSnapshot>)
-                                       
Class.forName(serializerConfigClassname, true, userCodeClassLoader);
-                       } catch (ClassNotFoundException e) {
-                               throw new IOException(
-                                       "Could not find requested 
TypeSerializerConfigSnapshot class "
-                                               + serializerConfigClassname +  
" in classpath.", e);
-                       }
-
-                       serializerConfigSnapshot = 
InstantiationUtil.instantiate(serializerConfigSnapshotClass);
-                       
serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
-                       serializerConfigSnapshot.read(in);
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-
-               TypeSerializerConfigSnapshot getSerializerConfigSnapshot() {
-                       return serializerConfigSnapshot;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
new file mode 100644
index 0000000..ddfeab4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.Arrays;
+
+/**
+ * Dummy TypeSerializer to avoid that data is lost when checkpointing again a 
serializer for which we encountered
+ * a {@link ClassNotFoundException} or {@link InvalidClassException}.
+ */
+public class UnloadableDummyTypeSerializer<T> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 2526330533671642711L;
+       private final byte[] actualBytes;
+
+       public UnloadableDummyTypeSerializer(byte[] actualBytes) {
+               this.actualBytes = Preconditions.checkNotNull(actualBytes);
+       }
+
+       public byte[] getActualBytes() {
+               return actualBytes;
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public T createInstance() {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public T copy(T from) {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public int getLength() {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return false;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               UnloadableDummyTypeSerializer<?> that = 
(UnloadableDummyTypeSerializer<?>) o;
+
+               return Arrays.equals(getActualBytes(), that.getActualBytes());
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(getActualBytes());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 8fa2315..5572985 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -20,21 +20,23 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  * Configuration snapshot of a serializer for collection types.
+ *
+ * @param <T> Type of the element.
  */
 @Internal
-public final class CollectionSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+public final class CollectionSerializerConfigSnapshot<T> extends 
CompositeTypeSerializerConfigSnapshot {
 
        private static final int VERSION = 1;
 
        /** This empty nullary constructor is required for deserializing the 
configuration. */
        public CollectionSerializerConfigSnapshot() {}
 
-       public CollectionSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
elementSerializerConfigSnapshot) {
-               super(elementSerializerConfigSnapshot);
+       public CollectionSerializerConfigSnapshot(TypeSerializer<T> 
elementSerializer) {
+               super(elementSerializer);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 54c604c..cdfc964 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -23,9 +23,12 @@ import java.lang.reflect.Array;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -196,7 +199,7 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
 
        @Override
        public GenericArraySerializerConfigSnapshot snapshotConfiguration() {
-               return new 
GenericArraySerializerConfigSnapshot<>(componentClass, 
componentSerializer.snapshotConfiguration());
+               return new 
GenericArraySerializerConfigSnapshot<>(componentClass, componentSerializer);
        }
 
        @Override
@@ -205,8 +208,14 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
                        final GenericArraySerializerConfigSnapshot config = 
(GenericArraySerializerConfigSnapshot) configSnapshot;
 
                        if (componentClass.equals(config.getComponentClass())) {
-                               CompatibilityResult<C> compatResult = 
componentSerializer.ensureCompatibility(
-                                       
config.getSingleNestedSerializerConfigSnapshot());
+                               Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> previousComponentSerializerAndConfig =
+                                       
config.getSingleNestedSerializerAndConfig();
+
+                               CompatibilityResult<C> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                               
previousComponentSerializerAndConfig.f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               
previousComponentSerializerAndConfig.f1,
+                                               componentSerializer);
 
                                if (!compatResult.isRequiresMigration()) {
                                        return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index e78eb6c..79dcf89 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -47,9 +47,9 @@ public final class GenericArraySerializerConfigSnapshot<C> 
extends CompositeType
 
        public GenericArraySerializerConfigSnapshot(
                        Class<C> componentClass,
-                       TypeSerializerConfigSnapshot 
componentSerializerConfigSnapshot) {
+                       TypeSerializer<C> componentSerializer) {
 
-               super(componentSerializerConfigSnapshot);
+               super(componentSerializer);
 
                this.componentClass = 
Preconditions.checkNotNull(componentClass);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 1f271fe..c2b935c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -20,9 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -177,14 +180,20 @@ public final class ListSerializer<T> extends 
TypeSerializer<List<T>> {
 
        @Override
        public CollectionSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+               return new 
CollectionSerializerConfigSnapshot<>(elementSerializer);
        }
 
        @Override
        public CompatibilityResult<List<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
-                       CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
-                               ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
previousElemSerializerAndConfig =
+                               ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
+
+                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       previousElemSerializerAndConfig.f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       previousElemSerializerAndConfig.f1,
+                                       elementSerializer);
 
                        if (!compatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index d5d6ec8..23b494b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -20,14 +20,18 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -202,19 +206,26 @@ public final class MapSerializer<K, V> extends 
TypeSerializer<Map<K, V>> {
 
        @Override
        public MapSerializerConfigSnapshot snapshotConfiguration() {
-               return new MapSerializerConfigSnapshot(
-                               keySerializer.snapshotConfiguration(),
-                               valueSerializer.snapshotConfiguration());
+               return new MapSerializerConfigSnapshot<>(keySerializer, 
valueSerializer);
        }
 
        @Override
        public CompatibilityResult<Map<K, V>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof MapSerializerConfigSnapshot) {
-                       TypeSerializerConfigSnapshot[] 
keyValueSerializerConfigSnapshots =
-                               ((MapSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
-
-                       CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
-                       CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
+                               ((MapSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                       CompatibilityResult<K> keyCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
previousKvSerializersAndConfigs.get(0).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       
previousKvSerializersAndConfigs.get(0).f1,
+                                       keySerializer);
+
+                       CompatibilityResult<V> valueCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
previousKvSerializersAndConfigs.get(1).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       
previousKvSerializersAndConfigs.get(1).f1,
+                                       valueSerializer);
 
                        if (!keyCompatResult.isRequiresMigration() && 
!valueCompatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 38e1254..9db3019 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -20,25 +20,22 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  * Configuration snapshot for serializers of maps, containing the
  * configuration snapshot of its key serializer and value serializer.
  */
 @Internal
-public final class MapSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+public final class MapSerializerConfigSnapshot<K, V> extends 
CompositeTypeSerializerConfigSnapshot {
 
        private static final int VERSION = 1;
 
        /** This empty nullary constructor is required for deserializing the 
configuration. */
        public MapSerializerConfigSnapshot() {}
 
-       public MapSerializerConfigSnapshot(
-                       TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot,
-                       TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot) {
-
-               super(keySerializerConfigSnapshot, 
valueSerializerConfigSnapshot);
+       public MapSerializerConfigSnapshot(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer) {
+               super(keySerializer, valueSerializer);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 4373ee0..18ebcd8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -20,14 +20,18 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Either;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.flink.types.Either.Left;
 import static org.apache.flink.types.Either.Right;
@@ -193,19 +197,26 @@ public class EitherSerializer<L, R> extends 
TypeSerializer<Either<L, R>> {
 
        @Override
        public EitherSerializerConfigSnapshot snapshotConfiguration() {
-               return new EitherSerializerConfigSnapshot(
-                               leftSerializer.snapshotConfiguration(),
-                               rightSerializer.snapshotConfiguration());
+               return new EitherSerializerConfigSnapshot<>(leftSerializer, 
rightSerializer);
        }
 
        @Override
        public CompatibilityResult<Either<L, R>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                if (configSnapshot instanceof EitherSerializerConfigSnapshot) {
-                       TypeSerializerConfigSnapshot[] 
leftRightSerializerConfigSnapshots =
-                               ((EitherSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
-
-                       CompatibilityResult<L> leftCompatResult = 
leftSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[0]);
-                       CompatibilityResult<R> rightCompatResult = 
rightSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[1]);
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousLeftRightSerializersAndConfigs =
+                               ((EitherSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                       CompatibilityResult<L> leftCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
previousLeftRightSerializersAndConfigs.get(0).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       
previousLeftRightSerializersAndConfigs.get(0).f1,
+                                       leftSerializer);
+
+                       CompatibilityResult<R> rightCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
previousLeftRightSerializersAndConfigs.get(1).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       
previousLeftRightSerializersAndConfigs.get(1).f1,
+                                       rightSerializer);
 
                        if (!leftCompatResult.isRequiresMigration() && 
!rightCompatResult.isRequiresMigration()) {
                                return CompatibilityResult.compatible();

http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index 473d438..f996878 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.types.Either;
 
 /**
@@ -28,18 +28,15 @@ import org.apache.flink.types.Either;
  * containing configuration snapshots of the Left and Right serializers.
  */
 @Internal
-public final class EitherSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+public final class EitherSerializerConfigSnapshot<L, R> extends 
CompositeTypeSerializerConfigSnapshot {
 
        private static final int VERSION = 1;
 
        /** This empty nullary constructor is required for deserializing the 
configuration. */
        public EitherSerializerConfigSnapshot() {}
 
-       public EitherSerializerConfigSnapshot(
-                       TypeSerializerConfigSnapshot 
leftSerializerConfigSnapshot,
-                       TypeSerializerConfigSnapshot 
rightSerializerConfigSnapshot) {
-
-               super(leftSerializerConfigSnapshot, 
rightSerializerConfigSnapshot);
+       public EitherSerializerConfigSnapshot(TypeSerializer<L> leftSerializer, 
TypeSerializer<R> rightSerializer) {
+               super(leftSerializer, rightSerializer);
        }
 
        @Override

Reply via email to