[FLINK-6178] [core] Allow serializer upgrades for managed state

This commit adds the functionality of allowing serializer upgrades for
Flink's managed state. It consists of 2 major changes: 1) new
user-facing API in `TypeSerializer`, and 2) activate serializer upgrades
in state backends.

For 1) new user-facing API for `TypeSerializer`, the following is added:
- new class: TypeSerializerConfigSnapshot
- new class: CompatibilityResult
- new method: TypeSerializer#snapshotConfiguration()
- new method:
  TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)

Generally speaking, configuration snapshots contains a point-in-time
view of a serializer's state / configuration, and is persisted along
with checkpoints. On restore, the configuration is confronted with the
new serializer of the state to check for compatibility, which may
introduce reconfiguration of the new serializer to be compatible.

This compatibility check is integrated in the state backends' restore
flow in 2). Currently, if the check results in the need to perform state
migration, the restore simply fails as the state migration feature isn't
yet available.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8aa5e057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8aa5e057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8aa5e057

Branch: refs/heads/master
Commit: 8aa5e05733655e7b3d1f11ed15f61672d61e5cb5
Parents: 409319a
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon May 8 02:04:23 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Mon May 8 02:04:23 2017 +0800

----------------------------------------------------------------------
 .../typeutils/runtime/WritableSerializer.java   |  45 +-
 .../state/RocksDBKeyedStateBackend.java         | 203 ++++--
 .../streaming/state/RocksDBStateBackend.java    |   2 +-
 .../common/typeutils/CompatibilityResult.java   |  80 +++
 .../CompositeTypeSerializerConfigSnapshot.java  |  85 +++
 .../GenericTypeSerializerConfigSnapshot.java    |  88 +++
 .../ParameterlessTypeSerializerConfig.java      |  89 +++
 .../api/common/typeutils/TypeSerializer.java    |  53 +-
 .../typeutils/TypeSerializerConfigSnapshot.java | 103 +++
 .../TypeSerializerSerializationProxy.java       |  12 +-
 .../common/typeutils/TypeSerializerUtil.java    | 203 ++++++
 .../typeutils/base/BooleanSerializer.java       |   6 +
 .../typeutils/base/BooleanValueSerializer.java  |   9 +-
 .../common/typeutils/base/ByteSerializer.java   |   7 +-
 .../typeutils/base/ByteValueSerializer.java     |   9 +-
 .../common/typeutils/base/CharSerializer.java   |   7 +-
 .../typeutils/base/CharValueSerializer.java     |   9 +-
 .../CollectionSerializerConfigSnapshot.java     |  44 ++
 .../common/typeutils/base/DateSerializer.java   |   7 +
 .../common/typeutils/base/DoubleSerializer.java |   7 +-
 .../typeutils/base/DoubleValueSerializer.java   |   9 +-
 .../common/typeutils/base/EnumSerializer.java   | 175 ++++-
 .../common/typeutils/base/FloatSerializer.java  |   7 +-
 .../typeutils/base/FloatValueSerializer.java    |   9 +-
 .../typeutils/base/GenericArraySerializer.java  |  36 +-
 .../GenericArraySerializerConfigSnapshot.java   |  95 +++
 .../common/typeutils/base/IntSerializer.java    |   7 +-
 .../typeutils/base/IntValueSerializer.java      |   9 +-
 .../common/typeutils/base/ListSerializer.java   |  30 +-
 .../common/typeutils/base/LongSerializer.java   |   7 +-
 .../typeutils/base/LongValueSerializer.java     |   9 +-
 .../common/typeutils/base/MapSerializer.java    |  39 +-
 .../base/MapSerializerConfigSnapshot.java       |  48 ++
 .../common/typeutils/base/ShortSerializer.java  |   7 +-
 .../typeutils/base/ShortValueSerializer.java    |   9 +-
 .../typeutils/base/SqlDateSerializer.java       |   7 +
 .../typeutils/base/SqlTimeSerializer.java       |  12 +
 .../common/typeutils/base/StringSerializer.java |   6 +
 .../typeutils/base/StringValueSerializer.java   |   9 +-
 .../typeutils/base/TypeSerializerSingleton.java |  32 +
 .../array/BooleanPrimitiveArraySerializer.java  |   3 +-
 .../array/BytePrimitiveArraySerializer.java     |   2 +-
 .../array/CharPrimitiveArraySerializer.java     |   3 +-
 .../array/DoublePrimitiveArraySerializer.java   |   3 +-
 .../array/FloatPrimitiveArraySerializer.java    |   3 +-
 .../base/array/IntPrimitiveArraySerializer.java |   3 +-
 .../array/LongPrimitiveArraySerializer.java     |   3 +-
 .../array/ShortPrimitiveArraySerializer.java    |   3 +-
 .../base/array/StringArraySerializer.java       |   3 +-
 .../java/typeutils/runtime/AvroSerializer.java  | 144 +++-
 .../runtime/CopyableValueSerializer.java        |  42 +-
 .../typeutils/runtime/EitherSerializer.java     |  37 +
 .../runtime/EitherSerializerConfigSnapshot.java |  49 ++
 .../typeutils/runtime/KryoRegistration.java     | 173 +++++
 ...ryoRegistrationSerializerConfigSnapshot.java | 251 +++++++
 .../api/java/typeutils/runtime/KryoUtils.java   |  27 +
 .../java/typeutils/runtime/PojoSerializer.java  | 681 ++++++++++++++++---
 .../java/typeutils/runtime/RowSerializer.java   |  81 ++-
 .../typeutils/runtime/TupleSerializerBase.java  |  42 ++
 .../runtime/TupleSerializerConfigSnapshot.java  |  91 +++
 .../java/typeutils/runtime/ValueSerializer.java |  86 ++-
 .../typeutils/runtime/kryo/KryoSerializer.java  | 218 ++++--
 .../common/typeutils/SerializerTestBase.java    |  53 ++
 .../TypeSerializerConfigSnapshotTest.java       | 147 ++++
 .../typeutils/base/EnumSerializerTest.java      | 140 ++++
 .../array/CharPrimitiveArraySerializerTest.java |   2 -
 .../typeutils/runtime/PojoSerializerTest.java   | 270 +++++++-
 .../kryo/KryoSerializerCompatibilityTest.java   | 136 ++++
 .../api/java/io/CollectionInputFormatTest.java  |  12 +
 .../flink/cep/NonDuplicatingTypeSerializer.java |  16 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  47 +-
 .../AbstractKeyedCEPPatternOperator.java        |  34 +-
 .../valuearray/IntValueArraySerializer.java     |   7 +
 .../valuearray/LongValueArraySerializer.java    |   7 +
 .../valuearray/StringValueArraySerializer.java  |   7 +
 .../table/runtime/types/CRowSerializer.scala    |  53 +-
 .../MigrationNamespaceSerializerProxy.java      |  17 +-
 .../AbstractMigrationRestoreStrategy.java       |   8 +-
 .../runtime/state/ArrayListSerializer.java      |  31 +-
 .../state/DefaultOperatorStateBackend.java      | 130 ++--
 .../flink/runtime/state/HashMapSerializer.java  |  40 +-
 .../flink/runtime/state/JavaSerializer.java     |  19 +-
 .../state/KeyedBackendSerializationProxy.java   | 178 +----
 ...ckendStateMetaInfoSnapshotReaderWriters.java | 257 +++++++
 .../OperatorBackendSerializationProxy.java      | 168 ++---
 ...ckendStateMetaInfoSnapshotReaderWriters.java | 233 +++++++
 .../state/RegisteredBackendStateMetaInfo.java   | 145 ----
 .../RegisteredKeyedBackendStateMetaInfo.java    | 246 +++++++
 .../RegisteredOperatorBackendStateMetaInfo.java | 198 ++++++
 .../flink/runtime/state/StateMigrationUtil.java |  83 +++
 .../runtime/state/VoidNamespaceSerializer.java  |   8 +
 .../state/heap/CopyOnWriteStateTable.java       |  12 +-
 .../state/heap/HeapKeyedStateBackend.java       |  68 +-
 .../state/heap/NestedMapsStateTable.java        |   4 +-
 .../flink/runtime/state/heap/StateTable.java    |  12 +-
 .../state/heap/StateTableByKeyGroupReaders.java |   9 +-
 .../testutils/types/IntListSerializer.java      |  12 +
 .../testutils/types/IntPairSerializer.java      |  12 +
 .../testutils/types/StringPairSerializer.java   |  12 +
 .../runtime/query/QueryableStateClientTest.java |   6 +-
 .../runtime/state/OperatorStateBackendTest.java |   2 +-
 .../runtime/state/SerializationProxiesTest.java | 150 +++-
 .../runtime/state/StateBackendTestBase.java     | 305 ++++++++-
 .../state/heap/CopyOnWriteStateTableTest.java   |  34 +-
 .../StateTableSnapshotCompatibilityTest.java    |   6 +-
 .../testutils/recordutils/RecordSerializer.java |  12 +
 .../api/scala/typeutils/EitherSerializer.scala  |  48 +-
 .../scala/typeutils/EnumValueSerializer.scala   | 116 +++-
 .../api/scala/typeutils/NothingSerializer.scala |  12 +-
 .../api/scala/typeutils/OptionSerializer.scala  |  52 +-
 .../scala/typeutils/TraversableSerializer.scala |  13 +-
 .../api/scala/typeutils/TrySerializer.scala     |  58 +-
 .../MultiplexingStreamRecordSerializer.java     |  51 ++
 .../streamrecord/StreamRecordSerializer.java    |  49 ++
 .../api/datastream/CoGroupedStreams.java        |  12 +
 .../streaming/api/operators/InternalTimer.java  |  12 +
 .../api/windowing/windows/GlobalWindow.java     |  18 +-
 .../api/windowing/windows/TimeWindow.java       |  19 +-
 .../streamrecord/StreamElementSerializer.java   |  53 ++
 .../jar/CheckpointingCustomKvStateProgram.java  |   1 -
 120 files changed, 6429 insertions(+), 1011 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 9036d75..1a02e7b 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -20,7 +20,11 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 
 import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
@@ -30,7 +34,8 @@ import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.IOException;
 
-public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
+@Internal
+public final class WritableSerializer<T extends Writable> extends 
TypeSerializer<T> {
        
        private static final long serialVersionUID = 1L;
        
@@ -149,4 +154,42 @@ public class WritableSerializer<T extends Writable> 
extends TypeSerializer<T> {
        public boolean canEqual(Object obj) {
                return obj instanceof WritableSerializer;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public WritableSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new WritableSerializerConfigSnapshot<>(typeClass);
+       }
+
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof WritableSerializerConfigSnapshot
+                               && 
typeClass.equals(((WritableSerializerConfigSnapshot) 
configSnapshot).getTypeClass())) {
+
+                       return CompatibilityResult.compatible();
+               } else {
+                       return CompatibilityResult.requiresMigration(null);
+               }
+       }
+
+       public static final class WritableSerializerConfigSnapshot<T extends 
Writable>
+                       extends GenericTypeSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public WritableSerializerConfigSnapshot() {}
+
+               public WritableSerializerConfigSnapshot(Class<T> 
writableTypeClass) {
+                       super(writableTypeClass);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index b8e60cd..079ea13 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -26,7 +26,9 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -52,6 +54,7 @@ import 
org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -59,9 +62,9 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -151,7 +154,15 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * Information about the k/v states as we create them. This is used to 
retrieve the
         * column family that is used for a state and also for sanity checks 
when restoring.
         */
-       private Map<String, Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>>> kvStateInformation;
+       private Map<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+
+       /**
+        * Map of state names to their corresponding restored state meta info.
+        *
+        * TODO this map can be removed when eager-state registration is in 
place.
+        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
+        */
+       private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> 
restoredKvStateMetaInfos;
 
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
@@ -229,7 +240,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // and access it in a synchronized block that locks on 
#dbDisposeLock.
                        if (db != null) {
 
-                               for (Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>> column :
+                               for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
                                                kvStateInformation.values()) {
                                        try {
                                                column.f0.close();
@@ -568,23 +579,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private void writeKVStateMetaData() throws IOException {
 
-                       List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> metaInfoList =
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
                                        new 
ArrayList<>(stateBackend.kvStateInformation.size());
 
                        int kvStateId = 0;
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>>> column :
+                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
                                        
stateBackend.kvStateInformation.entrySet()) {
 
-                               RegisteredBackendStateMetaInfo<?, ?> metaInfo = 
column.getValue().f1;
-
-                               KeyedBackendSerializationProxy.StateMetaInfo<?, 
?> metaInfoProxy =
-                                               new 
KeyedBackendSerializationProxy.StateMetaInfo<>(
-                                                               
metaInfo.getStateType(),
-                                                               
metaInfo.getName(),
-                                                               
metaInfo.getNamespaceSerializer(),
-                                                               
metaInfo.getStateSerializer());
-
-                               metaInfoList.add(metaInfoProxy);
+                               
metaInfoSnapshots.add(column.getValue().f1.snapshot());
 
                                //retrieve iterator for this k/v states
                                readOptions = new ReadOptions();
@@ -597,7 +599,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
 
                        KeyedBackendSerializationProxy serializationProxy =
-                                       new 
KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoList);
+                                       new 
KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), 
metaInfoSnapshots);
 
                        serializationProxy.write(outputView);
                }
@@ -717,7 +719,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private Map<String, StreamStateHandle> baseSstFiles;
 
-               private final 
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new 
ArrayList<>();
+               private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
 
                private FileSystem backupFileSystem;
                private Path backupPath;
@@ -800,7 +802,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
 
                                KeyedBackendSerializationProxy 
serializationProxy =
-                                       new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+                                       new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, 
stateMetaInfoSnapshots);
                                DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
 
                                serializationProxy.write(out);
@@ -823,18 +825,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
                        // save meta data
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : 
stateBackend.kvStateInformation.entrySet()) {
-
-                               RegisteredBackendStateMetaInfo<?, ?> metaInfo = 
stateMetaInfoEntry.getValue().f1;
-
-                               KeyedBackendSerializationProxy.StateMetaInfo<?, 
?> metaInfoProxy =
-                                       new 
KeyedBackendSerializationProxy.StateMetaInfo<>(
-                                               metaInfo.getStateType(),
-                                               metaInfo.getName(),
-                                               
metaInfo.getNamespaceSerializer(),
-                                               metaInfo.getStateSerializer());
-
-                               stateMetaInfos.add(metaInfoProxy);
+                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
+                                       : 
stateBackend.kvStateInformation.entrySet()) {
+                               
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
                        }
 
                        // save state data
@@ -1112,33 +1105,38 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        serializationProxy.read(currentStateHandleInView);
 
-                       List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> metaInfoProxyList =
-                                       
serializationProxy.getNamedStateSerializationProxies();
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
+                                       
serializationProxy.getStateMetaInfoSnapshots();
 
-                       currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(metaInfoProxyList.size());
+                       currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
+                       rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new 
HashMap<>(restoredMetaInfos.size());
 
-                       for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> 
metaInfoProxy : metaInfoProxyList) {
-                               Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>> columnFamily =
-                                               
rocksDBKeyedStateBackend.kvStateInformation.get(metaInfoProxy.getStateName());
+                       for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
 
-                               if (null == columnFamily) {
+                               if 
(!rocksDBKeyedStateBackend.kvStateInformation.containsKey(restoredMetaInfo.getName()))
 {
                                        ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
-                                               
metaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+                                               
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
                                                
rocksDBKeyedStateBackend.columnOptions);
 
-                                       RegisteredBackendStateMetaInfo<?, ?> 
stateMetaInfo =
-                                                       new 
RegisteredBackendStateMetaInfo<>(metaInfoProxy);
+                                       RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
+                                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                                               
restoredMetaInfo.getStateType(),
+                                                               
restoredMetaInfo.getName(),
+                                                               
restoredMetaInfo.getNamespaceSerializer(),
+                                                               
restoredMetaInfo.getStateSerializer());
 
-                                       columnFamily = new 
Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
-                                                       
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor),
-                                                       stateMetaInfo);
+                                       
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
 
-                                       
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
columnFamily);
+                                       ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
+
+                                       
rocksDBKeyedStateBackend.kvStateInformation.put(
+                                               stateMetaInfo.getName(),
+                                               new Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo));
+
+                                       
currentStateHandleKVStateColumnFamilies.add(columnFamily);
                                } else {
-                                       //TODO we could check here for 
incompatible serializer versions between previous tasks
+                                       // TODO with eager state registration 
in place, check here for serializer migration strategies
                                }
-
-                               
currentStateHandleKVStateColumnFamilies.add(columnFamily.f0);
                        }
                }
 
@@ -1198,7 +1196,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        this.stateBackend = stateBackend;
                }
 
-               private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> readMetaData(
+               private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
                                StreamStateHandle metaStateHandle) throws 
Exception {
 
                        FSDataInputStream inputStream = null;
@@ -1212,7 +1210,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
                                serializationProxy.read(in);
 
-                               return 
serializationProxy.getNamedStateSerializationProxies();
+                               return 
serializationProxy.getStateMetaInfoSnapshots();
                        } finally {
                                if (inputStream != null) {
                                        
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
@@ -1294,18 +1292,21 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
 
                                // read meta data
-                               
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
+                               
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
                                        
readMetaData(restoreStateHandle.getMetaStateHandle());
 
                                List<ColumnFamilyDescriptor> 
columnFamilyDescriptors = new ArrayList<>();
 
-                               for 
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : 
stateMetaInfoProxies) {
+                               stateBackend.restoredKvStateMetaInfos = new 
HashMap<>(stateMetaInfoSnapshots.size());
+
+                               for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
 
                                        ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
-                                               
stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+                                               
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
                                                stateBackend.columnOptions);
 
                                        
columnFamilyDescriptors.add(columnFamilyDescriptor);
+                                       
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
                                }
 
                                if (hasExtraKeys) {
@@ -1320,23 +1321,27 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                for (int i = 0; i < 
columnFamilyHandles.size(); ++i) {
                                                        ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
                                                        ColumnFamilyDescriptor 
columnFamilyDescriptor = columnFamilyDescriptors.get(i);
-                                                       
KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = 
stateMetaInfoProxies.get(i);
+                                                       
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
 
-                                                       
Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> 
registeredStateMetaInfoEntry =
-                                                               
stateBackend.kvStateInformation.get(stateMetaInfoProxy.getStateName());
+                                                       
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> 
registeredStateMetaInfoEntry =
+                                                               
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
                                                        if (null == 
registeredStateMetaInfoEntry) {
 
-                                                               
RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
-                                                                       new 
RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy);
+                                                               
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+                                                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                                                               
stateMetaInfoSnapshot.getStateType(),
+                                                                               
stateMetaInfoSnapshot.getName(),
+                                                                               
stateMetaInfoSnapshot.getNamespaceSerializer(),
+                                                                               
stateMetaInfoSnapshot.getStateSerializer());
 
                                                                
registeredStateMetaInfoEntry =
-                                                                       new 
Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
+                                                                       new 
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
                                                                                
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
                                                                                
stateMetaInfo);
 
                                                                
stateBackend.kvStateInformation.put(
-                                                                       
stateMetaInfoProxy.getStateName(),
+                                                                       
stateMetaInfoSnapshot.getName(),
                                                                        
registeredStateMetaInfoEntry);
                                                        }
 
@@ -1403,15 +1408,19 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                columnFamilyDescriptors, 
columnFamilyHandles);
 
                                        for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
-                                               
KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = 
stateMetaInfoProxies.get(i);
+                                               
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
 
                                                ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
-                                               
RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
-                                                       new 
RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy);
+                                               
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+                                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                                               
stateMetaInfoSnapshot.getStateType(),
+                                                               
stateMetaInfoSnapshot.getName(),
+                                                               
stateMetaInfoSnapshot.getNamespaceSerializer(),
+                                                               
stateMetaInfoSnapshot.getStateSerializer());
 
                                                
stateBackend.kvStateInformation.put(
-                                                       
stateMetaInfoProxy.getStateName(),
-                                                       new 
Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
+                                                       
stateMetaInfoSnapshot.getName(),
+                                                       new 
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
                                                                
columnFamilyHandle, stateMetaInfo));
                                        }
 
@@ -1473,22 +1482,57 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        protected <N, S> ColumnFamilyHandle getColumnFamily(
                        StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException {
 
-               Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, 
?>> stateInfo =
+               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
                                kvStateInformation.get(descriptor.getName());
 
-               RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
-                               descriptor.getType(),
-                               descriptor.getName(),
-                               namespaceSerializer,
-                               descriptor.getSerializer());
+               RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+                       descriptor.getType(),
+                       descriptor.getName(),
+                       namespaceSerializer,
+                       descriptor.getSerializer());
 
                if (stateInfo != null) {
-                       if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
+                       // TODO with eager registration in place, these checks 
should be moved to restore()
+
+                       RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
+                               
restoredKvStateMetaInfos.get(descriptor.getName());
+
+                       Preconditions.checkState(
+                               
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
+                               "Incompatible state names. " +
+                                       "Was [" + restoredMetaInfo.getName() + 
"], " +
+                                       "registered with [" + 
newMetaInfo.getName() + "].");
+
+                       if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+                               && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+                               Preconditions.checkState(
+                                       
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
+                                       "Incompatible state types. " +
+                                               "Was [" + 
restoredMetaInfo.getStateType() + "], " +
+                                               "registered with [" + 
newMetaInfo.getStateType() + "].");
+                       }
+
+                       // check compatibility results to determine if state 
migration is required
+
+                       CompatibilityResult<N> namespaceCompatibility = 
StateMigrationUtil.resolveCompatibilityResult(
+                                       
restoredMetaInfo.getNamespaceSerializer(),
+                                       MigrationNamespaceSerializerProxy.class,
+                                       
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+                                       newMetaInfo.getNamespaceSerializer());
+
+                       CompatibilityResult<S> stateCompatibility = 
StateMigrationUtil.resolveCompatibilityResult(
+                                       restoredMetaInfo.getStateSerializer(),
+                                       
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+                                       
restoredMetaInfo.getStateSerializerConfigSnapshot(),
+                                       newMetaInfo.getStateSerializer());
+
+                       if (!namespaceCompatibility.requiresMigration() && 
!stateCompatibility.requiresMigration()) {
                                stateInfo.f1 = newMetaInfo;
                                return stateInfo.f0;
                        } else {
-                               throw new IOException("Trying to access state 
using wrong meta info, was " + stateInfo.f1 +
-                                               " trying access with " + 
newMetaInfo);
+                               // TODO state migration currently isn't 
possible.
+                               throw new RuntimeException("State migration 
currently isn't supported.");
                        }
                }
 
@@ -1497,7 +1541,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                try {
                        ColumnFamilyHandle columnFamily = 
db.createColumnFamily(columnDescriptor);
-                       Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<N, S>> tuple =
+                       Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
                                        new Tuple2<>(columnFamily, newMetaInfo);
                        Map rawAccess = kvStateInformation;
                        rawAccess.put(descriptor.getName(), tuple);
@@ -1832,6 +1876,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                // clear k/v state information before filling it
                kvStateInformation.clear();
 
+               restoredKvStateMetaInfos = new HashMap<>(namedStates.size());
+
                // first get the column family mapping
                int numColumns = inputView.readInt();
                Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new 
HashMap<>(numColumns);
@@ -1846,6 +1892,15 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        columnFamilyMapping.put(mappingByte, stateDescriptor);
 
+                       // mimic a restored kv state meta info
+                       restoredKvStateMetaInfos.put(
+                               stateDescriptor.getName(),
+                               new RegisteredKeyedBackendStateMetaInfo<>(
+                                       stateDescriptor.getType(),
+                                       stateDescriptor.getName(),
+                                       
MigrationNamespaceSerializerProxy.INSTANCE,
+                                       
stateDescriptor.getSerializer()).snapshot());
+
                        // this will fill in the k/v state information
                        getColumnFamily(stateDescriptor, 
MigrationNamespaceSerializerProxy.INSTANCE);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
index e5a78b6..695aa12 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -75,7 +75,7 @@ public class RocksDBStateBackend extends AbstractStateBackend 
{
                }
 
                private static void throwExceptionOnLoadingThisClass() {
-                       throw new RuntimeException("Attempt to migrate RocksDB 
state created with semi async snapshot mode failed. "
+                       throw new RuntimeException("Attempt to 
requiresMigration RocksDB state created with semi async snapshot mode failed. "
                                        + "Unfortunately, this is not 
supported. Please create a new savepoint for the job using fully "
                                        + "async mode in Flink 1.1 and run 
migration again with the new savepoint.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
new file mode 100644
index 0000000..cfbb516
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@code CompatibilityResult} contains information about whether or not 
data migration
+ * is required in order to continue using new serializers for previously 
serialized data.
+ *
+ * @param <T> the type of the data being migrated.
+ */
+@PublicEvolving
+public final class CompatibilityResult<T> {
+
+       /** Whether or not migration is required. */
+       private final boolean requiresMigration;
+
+       /**
+        * The convert deserializer to use for reading previous data during 
migration,
+        * in the case that the preceding serializer cannot be found.
+        *
+        * <p>This is only relevant if migration is required.
+        */
+       private final TypeSerializer<T> convertDeserializer;
+
+       /**
+        * Returns a strategy that signals that the new serializer is 
compatible and no migration is required.
+        *
+        * @return a result that signals migration is not required for the new 
serializer
+        */
+       public static <T> CompatibilityResult<T> compatible() {
+               return new CompatibilityResult<>(false, null);
+       }
+
+       /**
+        * Returns a strategy that signals migration to be performed.
+        *
+        * <p>Furthermore, in the case that the preceding serializer cannot be 
found or restored to read the
+        * previous data during migration, a provided convert deserializer can 
be used (may be {@code null}
+        * if one cannot be provided).
+        *
+        * <p>In the case that the preceding serializer cannot be found and a 
convert deserializer is not
+        * provided, the migration will fail due to the incapability of reading 
previous data.
+        *
+        * @return a result that signals migration is necessary, possibly 
providing a convert deserializer.
+        */
+       public static <T> CompatibilityResult<T> 
requiresMigration(TypeSerializer<T> convertDeserializer) {
+               return new CompatibilityResult<>(true, convertDeserializer);
+       }
+
+       private CompatibilityResult(boolean requiresMigration, 
TypeSerializer<T> convertDeserializer) {
+               this.requiresMigration = requiresMigration;
+               this.convertDeserializer = convertDeserializer;
+       }
+
+       public TypeSerializer<T> getConvertDeserializer() {
+               return convertDeserializer;
+       }
+
+       public boolean requiresMigration() {
+               return requiresMigration;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
new file mode 100644
index 0000000..e7e2650
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for serializers that has multiple 
nested serializers.
+ * The configuration snapshot consists of the configuration snapshots of all 
nested serializers.
+ */
+@Internal
+public abstract class CompositeTypeSerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot {
+
+       private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public CompositeTypeSerializerConfigSnapshot() {}
+
+       public 
CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... 
nestedSerializerConfigSnapshots) {
+               this.nestedSerializerConfigSnapshots = 
Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               super.write(out);
+               TypeSerializerUtil.writeSerializerConfigSnapshots(out, 
nestedSerializerConfigSnapshots);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               super.read(in);
+               nestedSerializerConfigSnapshots = 
TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
+       }
+
+       public TypeSerializerConfigSnapshot[] 
getNestedSerializerConfigSnapshots() {
+               return nestedSerializerConfigSnapshots;
+       }
+
+       public TypeSerializerConfigSnapshot 
getSingleNestedSerializerConfigSnapshot() {
+               return nestedSerializerConfigSnapshots[0];
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               }
+
+               if (obj == null) {
+                       return false;
+               }
+
+               return (obj.getClass().equals(getClass()))
+                               && Arrays.equals(
+                                       nestedSerializerConfigSnapshots,
+                                       
((CompositeTypeSerializerConfigSnapshot) 
obj).getNestedSerializerConfigSnapshots());
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(nestedSerializerConfigSnapshots);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
new file mode 100644
index 0000000..4edfe12
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerConfigSnapshot.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Configuration snapshot for serializers for generic types.
+ *
+ * @param <T> The type to be instantiated.
+ */
+@Internal
+public abstract class GenericTypeSerializerConfigSnapshot<T> extends 
TypeSerializerConfigSnapshot {
+
+       private Class<T> typeClass;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public GenericTypeSerializerConfigSnapshot() {}
+
+       public GenericTypeSerializerConfigSnapshot(Class<T> typeClass) {
+               this.typeClass = Preconditions.checkNotNull(typeClass);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               super.write(out);
+
+               // write only the classname to avoid Java serialization
+               out.writeUTF(typeClass.getName());
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void read(DataInputView in) throws IOException {
+               super.read(in);
+
+               String genericTypeClassname = in.readUTF();
+               try {
+                       typeClass = (Class<T>) 
Class.forName(genericTypeClassname, true, getUserCodeClassLoader());
+               } catch (ClassNotFoundException e) {
+                       throw new IOException("Could not find the requested 
class " + genericTypeClassname + " in classpath.", e);
+               }
+       }
+
+       public Class<T> getTypeClass() {
+               return typeClass;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               }
+
+               if (obj == null) {
+                       return false;
+               }
+
+               return (obj.getClass().equals(getClass()))
+                               && 
typeClass.equals(((GenericTypeSerializerConfigSnapshot) obj).getTypeClass());
+       }
+
+       @Override
+       public int hashCode() {
+               return typeClass.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
new file mode 100644
index 0000000..7ba7dd4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A base class for {@link TypeSerializerConfigSnapshot}s that do not have any 
parameters.
+ */
+@Internal
+public final class ParameterlessTypeSerializerConfig extends 
TypeSerializerConfigSnapshot {
+
+       private static final int VERSION = 1;
+
+       /**
+        * A string identifier that encodes the serialization format used by 
the serializer.
+        *
+        * TODO we might change this to a proper serialization format class in 
the future
+        */
+       private String serializationFormatIdentifier;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public ParameterlessTypeSerializerConfig() {}
+
+       public ParameterlessTypeSerializerConfig(String 
serializationFormatIdentifier) {
+               this.serializationFormatIdentifier = 
Preconditions.checkNotNull(serializationFormatIdentifier);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               super.write(out);
+               out.writeUTF(serializationFormatIdentifier);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               super.read(in);
+               serializationFormatIdentifier = in.readUTF();
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+
+       public String getSerializationFormatIdentifier() {
+               return serializationFormatIdentifier;
+       }
+
+       @Override
+       public boolean equals(Object other) {
+               if (other == this) {
+                       return true;
+               }
+
+               if (other == null) {
+                       return false;
+               }
+
+               return (other instanceof ParameterlessTypeSerializerConfig)
+                               && 
serializationFormatIdentifier.equals(((ParameterlessTypeSerializerConfig) 
other).getSerializationFormatIdentifier());
+       }
+
+       @Override
+       public int hashCode() {
+               return serializationFormatIdentifier.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 6edaec6..f0562d4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -161,7 +161,54 @@ public abstract class TypeSerializer<T> implements 
Serializable {
 
        public abstract int hashCode();
 
-       public boolean canRestoreFrom(TypeSerializer<?> other) {
-               return equals(other);
-       }
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Create a snapshot of the serializer's current configuration to be 
stored along with the managed state it is
+        * registered to (if any - this method is only relevant if this 
serializer is registered for serialization of
+        * managed state).
+        *
+        * <p>The configuration snapshot should contain information about the 
serializer's parameter settings and its
+        * serialization format. When a new serializer is registered to 
serialize the same managed state that this
+        * serializer was registered to, the returned configuration snapshot 
can be used to ensure compatibility
+        * of the new serializer and determine if state migration is required.
+        *
+        * @see TypeSerializerConfigSnapshot
+        *
+        * @return snapshot of the serializer's current configuration (cannot 
be {@code null}).
+        */
+       public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+
+       /**
+        * Ensure compatibility of this serializer with a preceding serializer 
that was registered for serialization of
+        * the same managed state (if any - this method is only relevant if 
this serializer is registered for
+        * serialization of managed state).
+        *
+        * The compatibility check in this method should be performed by 
inspecting the preceding serializer's configuration
+        * snapshot. The method may reconfigure the serializer (if required and 
possible) so that it may be compatible,
+        * or provide a signaling result that informs Flink that state 
migration is necessary before continuing to use
+        * this serializer.
+        *
+        * <p>The result can be one of the following:
+        * <ul>
+        *     <li>{@link CompatibilityResult#compatible()}: this signals Flink 
that this serializer is compatible, or
+        *     has been reconfigured to be compatible, to continue reading 
previous data, and that the
+        *     serialization schema remains the same. No migration needs to be 
performed.</li>
+        *
+        *     <li>{@link 
CompatibilityResult#requiresMigration(TypeSerializer)}: this signals Flink that
+        *     migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
+        *     compatible, for previous data. Furthermore, in the case that the 
preceding serializer cannot be found or
+        *     restored to read the previous data to perform the migration, the 
provided convert deserializer can be
+        *     used (may be {@code null} if one cannot be provided).</li>
+        * </ul>
+        *
+        * @see CompatibilityResult
+        *
+        * @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
+        *
+        * @return the determined compatibility result.
+        */
+       public abstract CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
new file mode 100644
index 0000000..27369b9
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.VersionMismatchException;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
+ * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
+ * serializer is registered to.
+ *
+ * <p>The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
+ * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
+ * should encode sufficient information about:
+ *
+ * <ul>
+ *   <li><strong>Parameter settings of the serializer:</strong> parameters of 
the serializer include settings
+ *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
+ *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
+ *   serializers.</li>
+ *
+ *   <li><strong>Serialization schema of the serializer:</strong> the data 
format used by the serializer.</li>
+ * </ul>
+ *
+ * <p>NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
+ * deserialize the configuration snapshot from its binary form.
+ */
+@PublicEvolving
+public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+
+       /** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
+       private ClassLoader userCodeClassLoader;
+
+       /** The snapshot version of this configuration. */
+       private Integer snapshotVersion;
+
+       /**
+        * Returns the version of the configuration at the time its snapshot 
was taken.
+        *
+        * @return the snapshot configuration's version.
+        */
+       public int getSnapshotVersion() {
+               if (snapshotVersion == null) {
+                       return getVersion();
+               } else {
+                       return snapshotVersion;
+               }
+       }
+
+       /**
+        * Set the user code class loader.
+        * Only relevant if this configuration instance was deserialized from 
binary form.
+        *
+        * <p>This method is not part of the public user-facing API, and cannot 
be overriden.
+        *
+        * @param userCodeClassLoader user code class loader.
+        */
+       @Internal
+       public final void setUserCodeClassLoader(ClassLoader 
userCodeClassLoader) {
+               this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+       }
+
+       /**
+        * Returns the user code class loader.
+        * Only relevant if this configuration instance was deserialized from 
binary form.
+        *
+        * @return the user code class loader
+        */
+       @Internal
+       public final ClassLoader getUserCodeClassLoader() {
+               return userCodeClassLoader;
+       }
+
+       @Override
+       protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
+               super.resolveVersionRead(foundVersion);
+               this.snapshotVersion = foundVersion;
+       }
+
+       public abstract boolean equals(Object obj);
+
+       public abstract int hashCode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
index c94124f..067a1ca 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -143,7 +143,7 @@ public class TypeSerializerSerializationProxy<T> extends 
VersionedIOReadableWrit
         * Dummy TypeSerializer to avoid that data is lost when checkpointing 
again a serializer for which we encountered
         * a {@link ClassNotFoundException}.
         */
-       static final class ClassNotFoundDummyTypeSerializer<T> extends 
TypeSerializer<T> {
+       public static final class ClassNotFoundDummyTypeSerializer<T> extends 
TypeSerializer<T> {
 
                private static final long serialVersionUID = 
2526330533671642711L;
                private final byte[] actualBytes;
@@ -207,6 +207,16 @@ public class TypeSerializerSerializationProxy<T> extends 
VersionedIOReadableWrit
                }
 
                @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
+               }
+
+               @Override
+               public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       throw new UnsupportedOperationException("This object is 
a dummy TypeSerializer.");
+               }
+
+               @Override
                public boolean canEqual(Object obj) {
                        return false;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
new file mode 100644
index 0000000..0a2148a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtil.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Utility methods for {@link TypeSerializer} and {@link 
TypeSerializerConfigSnapshot}.
+ */
+@Internal
+public class TypeSerializerUtil {
+
+       /**
+        * Creates an array of {@link TypeSerializerConfigSnapshot}s taken
+        * from the provided array of {@link TypeSerializer}s.
+        *
+        * @param serializers array of type serializers.
+        *
+        * @return array of configuration snapshots taken from each serializer.
+        */
+       public static TypeSerializerConfigSnapshot[] 
snapshotConfigurations(TypeSerializer<?>[] serializers) {
+               final TypeSerializerConfigSnapshot[] configSnapshots = new 
TypeSerializerConfigSnapshot[serializers.length];
+
+               for (int i = 0; i < serializers.length; i++) {
+                       configSnapshots[i] = 
serializers[i].snapshotConfiguration();
+               }
+
+               return configSnapshots;
+       }
+
+       /**
+        * Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+        *
+        * <p>It is written with a format that can be later read again using
+        * {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+        *
+        * @param out the data output view
+        * @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+        *
+        * @throws IOException
+        */
+       public static void writeSerializerConfigSnapshot(
+                       DataOutputView out,
+                       TypeSerializerConfigSnapshot serializerConfigSnapshot) 
throws IOException {
+
+               new 
TypeSerializerConfigSnapshotProxy(serializerConfigSnapshot).write(out);
+       }
+
+       /**
+        * Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+        * written using {@link #writeSerializerConfigSnapshot(DataOutputView, 
TypeSerializerConfigSnapshot)}.
+        *
+        * @param in the data input view
+        * @param userCodeClassLoader the user code class loader to use
+        *
+        * @return the read serializer configuration snapshot
+        *
+        * @throws IOException
+        */
+       public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader) throws IOException {
+
+               final TypeSerializerConfigSnapshotProxy proxy = new 
TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
+               proxy.read(in);
+
+               return proxy.getSerializerConfigSnapshot();
+       }
+
+       /**
+        * Writes multiple {@link TypeSerializerConfigSnapshot}s to the 
provided data output view.
+        *
+        * <p>It is written with a format that can be later read again using
+        * {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
+        *
+        * @param out the data output view
+        * @param serializerConfigSnapshots the serializer configuration 
snapshots to write
+        *
+        * @throws IOException
+        */
+       public static void writeSerializerConfigSnapshots(
+                       DataOutputView out,
+                       TypeSerializerConfigSnapshot... 
serializerConfigSnapshots) throws IOException {
+
+               out.writeInt(serializerConfigSnapshots.length);
+
+               for (TypeSerializerConfigSnapshot snapshot : 
serializerConfigSnapshots) {
+                       new 
TypeSerializerConfigSnapshotProxy(snapshot).write(out);
+               }
+       }
+
+       /**
+        * Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+        * written using {@link #writeSerializerConfigSnapshot(DataOutputView, 
TypeSerializerConfigSnapshot)}.
+        *
+        * @param in the data input view
+        * @param userCodeClassLoader the user code class loader to use
+        *
+        * @return the read serializer configuration snapshots
+        *
+        * @throws IOException
+        */
+       public static TypeSerializerConfigSnapshot[] 
readSerializerConfigSnapshots(
+                       DataInputView in,
+                       ClassLoader userCodeClassLoader) throws IOException {
+
+               int numFields = in.readInt();
+               final TypeSerializerConfigSnapshot[] serializerConfigSnapshots 
= new TypeSerializerConfigSnapshot[numFields];
+
+               TypeSerializerConfigSnapshotProxy proxy;
+               for (int i = 0; i < numFields; i++) {
+                       proxy = new 
TypeSerializerConfigSnapshotProxy(userCodeClassLoader);
+                       proxy.read(in);
+                       serializerConfigSnapshots[i] = 
proxy.getSerializerConfigSnapshot();
+               }
+
+               return serializerConfigSnapshots;
+       }
+
+       /**
+        * Utility serialization proxy for a {@link 
TypeSerializerConfigSnapshot}.
+        */
+       static class TypeSerializerConfigSnapshotProxy extends 
VersionedIOReadableWritable {
+
+               private static final int VERSION = 1;
+
+               private ClassLoader userCodeClassLoader;
+               private TypeSerializerConfigSnapshot serializerConfigSnapshot;
+
+               TypeSerializerConfigSnapshotProxy(ClassLoader 
userCodeClassLoader) {
+                       this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+               }
+
+               TypeSerializerConfigSnapshotProxy(TypeSerializerConfigSnapshot 
serializerConfigSnapshot) {
+                       this.serializerConfigSnapshot = 
serializerConfigSnapshot;
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       // config snapshot class, so that we can re-instantiate 
the
+                       // correct type of config snapshot instance when 
deserializing
+                       
out.writeUTF(serializerConfigSnapshot.getClass().getName());
+
+                       // the actual configuration parameters
+                       serializerConfigSnapshot.write(out);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       String serializerConfigClassname = in.readUTF();
+                       Class<? extends TypeSerializerConfigSnapshot> 
serializerConfigSnapshotClass;
+                       try {
+                               serializerConfigSnapshotClass = (Class<? 
extends TypeSerializerConfigSnapshot>)
+                                       
Class.forName(serializerConfigClassname, true, userCodeClassLoader);
+                       } catch (ClassNotFoundException e) {
+                               throw new IOException(
+                                       "Could not find requested 
TypeSerializerConfigSnapshot class "
+                                               + serializerConfigClassname +  
" in classpath.", e);
+                       }
+
+                       serializerConfigSnapshot = 
InstantiationUtil.instantiate(serializerConfigSnapshotClass);
+                       
serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
+                       serializerConfigSnapshot.read(in);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               TypeSerializerConfigSnapshot getSerializerConfigSnapshot() {
+                       return serializerConfigSnapshot;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 609f184..f275807 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -82,4 +82,10 @@ public final class BooleanSerializer extends 
TypeSerializerSingleton<Boolean> {
        public boolean canEqual(Object obj) {
                return obj instanceof BooleanSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(BooleanValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index 62c91df..4755549 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -31,8 +31,7 @@ public final class BooleanValueSerializer extends 
TypeSerializerSingleton<Boolea
        private static final long serialVersionUID = 1L;
        
        public static final BooleanValueSerializer INSTANCE = new 
BooleanValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -86,4 +85,10 @@ public final class BooleanValueSerializer extends 
TypeSerializerSingleton<Boolea
        public boolean canEqual(Object obj) {
                return obj instanceof BooleanValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(BooleanSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index 6ad7e4e..bf2baf5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -33,7 +33,6 @@ public final class ByteSerializer extends 
TypeSerializerSingleton<Byte> {
        
        private static final Byte ZERO = Byte.valueOf((byte) 0);
 
-
        @Override
        public boolean isImmutableType() {
                return true;
@@ -83,4 +82,10 @@ public final class ByteSerializer extends 
TypeSerializerSingleton<Byte> {
        public boolean canEqual(Object obj) {
                return obj instanceof ByteSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(ByteValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index 848b01e..2547dda 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -31,8 +31,7 @@ public final class ByteValueSerializer extends 
TypeSerializerSingleton<ByteValue
        private static final long serialVersionUID = 1L;
        
        public static final ByteValueSerializer INSTANCE = new 
ByteValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -84,4 +83,10 @@ public final class ByteValueSerializer extends 
TypeSerializerSingleton<ByteValue
        public boolean canEqual(Object obj) {
                return obj instanceof ByteValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                               || 
identifier.equals(ByteSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 21a32f1..dda3543 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -33,7 +33,6 @@ public final class CharSerializer extends 
TypeSerializerSingleton<Character> {
        
        private static final Character ZERO = Character.valueOf((char)0);
 
-
        @Override
        public boolean isImmutableType() {
                return true;
@@ -83,4 +82,10 @@ public final class CharSerializer extends 
TypeSerializerSingleton<Character> {
        public boolean canEqual(Object obj) {
                return obj instanceof CharSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(CharValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 84dc39a..e012b8c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -31,8 +31,7 @@ public class CharValueSerializer extends 
TypeSerializerSingleton<CharValue> {
        private static final long serialVersionUID = 1L;
        
        public static final CharValueSerializer INSTANCE = new 
CharValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -84,4 +83,10 @@ public class CharValueSerializer extends 
TypeSerializerSingleton<CharValue> {
        public boolean canEqual(Object obj) {
                return obj instanceof CharValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(CharSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
new file mode 100644
index 0000000..8fa2315
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * Configuration snapshot of a serializer for collection types.
+ */
+@Internal
+public final class CollectionSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+       private static final int VERSION = 1;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public CollectionSerializerConfigSnapshot() {}
+
+       public CollectionSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
elementSerializerConfigSnapshot) {
+               super(elementSerializerConfigSnapshot);
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 3f27de2..28ed904 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -101,4 +101,11 @@ public final class DateSerializer extends 
TypeSerializerSingleton<Date> {
        public boolean canEqual(Object obj) {
                return obj instanceof DateSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(SqlDateSerializer.class.getCanonicalName())
+                       || 
identifier.equals(SqlTimeSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 375cb9c..92fe71d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -33,7 +33,6 @@ public final class DoubleSerializer extends 
TypeSerializerSingleton<Double> {
        
        private static final Double ZERO = Double.valueOf(0);
 
-       
        @Override
        public boolean isImmutableType() {
                return true;
@@ -83,4 +82,10 @@ public final class DoubleSerializer extends 
TypeSerializerSingleton<Double> {
        public boolean canEqual(Object obj) {
                return obj instanceof DoubleSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(DoubleValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index 232ad6b..9e7e8d0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -31,8 +31,7 @@ public final class DoubleValueSerializer extends 
TypeSerializerSingleton<DoubleV
        private static final long serialVersionUID = 1L;
        
        public static final DoubleValueSerializer INSTANCE = new 
DoubleValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -84,4 +83,10 @@ public final class DoubleValueSerializer extends 
TypeSerializerSingleton<DoubleV
        public boolean canEqual(Object obj) {
                return obj instanceof DoubleValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(DoubleSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index c45487f..2f74d84 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -20,11 +20,24 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -36,7 +49,21 @@ public final class EnumSerializer<T extends Enum<T>> extends 
TypeSerializer<T> {
 
        private final Class<T> enumClass;
 
-       private transient T[] values;
+       /**
+        * Maintain our own map of enum value to their ordinal, instead of 
directly using {@link Enum#ordinal}.
+        * This allows us to maintain backwards compatibility for previous 
serialized data in the case that the
+        * order of enum constants was changed or new constants were added.
+        *
+        * <p>On a fresh start with no reconfiguration, the ordinals would 
simply be identical to the enum
+        * constants actual ordinals. Ordinals may change after reconfiguration.
+        */
+       private Map<T, Integer> valueToOrdinal;
+
+       /**
+        * Array of enum constants with their indexes identical to their 
ordinals in the {@link #valueToOrdinal} map.
+        * Serves as a bidirectional map to have fast access from ordinal to 
value. May be reordered after reconfiguration.
+        */
+       private T[] values;
 
        public EnumSerializer(Class<T> enumClass) {
                this.enumClass = checkNotNull(enumClass);
@@ -44,6 +71,12 @@ public final class EnumSerializer<T extends Enum<T>> extends 
TypeSerializer<T> {
 
                this.values = enumClass.getEnumConstants();
                checkArgument(this.values.length > 0, "cannot use an empty 
enum");
+
+               this.valueToOrdinal = new HashMap<>(values.length);
+               int i = 0;
+               for (T value : values) {
+                       this.valueToOrdinal.put(value, i++);
+               }
        }
 
        @Override
@@ -78,7 +111,8 @@ public final class EnumSerializer<T extends Enum<T>> extends 
TypeSerializer<T> {
 
        @Override
        public void serialize(T record, DataOutputView target) throws 
IOException {
-               target.writeInt(record.ordinal());
+               // use our own maintained ordinals instead of the actual enum 
ordinal
+               target.writeInt(valueToOrdinal.get(record));
        }
 
        @Override
@@ -121,6 +155,141 @@ public final class EnumSerializer<T extends Enum<T>> 
extends TypeSerializer<T> {
 
        private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
                in.defaultReadObject();
-               this.values = enumClass.getEnumConstants();
+
+               // may be null if this serializer was deserialized from an 
older version
+               if (this.values == null) {
+                       this.values = enumClass.getEnumConstants();
+
+                       this.valueToOrdinal = new HashMap<>(values.length);
+                       int i = 0;
+                       for (T value : values) {
+                               this.valueToOrdinal.put(value, i++);
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public EnumSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new EnumSerializerConfigSnapshot<>(enumClass, values);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof EnumSerializerConfigSnapshot) {
+                       final EnumSerializerConfigSnapshot<T> config = 
(EnumSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (enumClass.equals(config.getTypeClass())) {
+
+                               // reorder enum constants so that previously 
existing constants
+                               // remain in the same order, and new
+                               LinkedHashSet<T> reorderedEnumConstants = new 
LinkedHashSet<>();
+                               
reorderedEnumConstants.addAll(Arrays.asList(config.getEnumConstants()));
+                               
reorderedEnumConstants.addAll(Arrays.asList(enumClass.getEnumConstants()));
+
+                               // regenerate enum constant to ordinal 
bidirectional map
+                               this.values = (T[]) 
Array.newInstance(enumClass, reorderedEnumConstants.size());
+                               this.valueToOrdinal.clear();
+                               int i = 0;
+                               for (T constant : reorderedEnumConstants) {
+                                       this.values[i] = constant;
+                                       this.valueToOrdinal.put(constant, i);
+                                       i++;
+                               }
+
+                               return CompatibilityResult.compatible();
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       /**
+        * Configuration snapshot of a serializer for enumerations.
+        *
+        * Configuration contains the enum class, and an array of the enum's 
constants
+        * that existed when the configuration snapshot was taken.
+        *
+        * @param <T> the enum type.
+        */
+       public static final class EnumSerializerConfigSnapshot<T extends 
Enum<T>>
+                       extends GenericTypeSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               private T[] enumConstants;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public EnumSerializerConfigSnapshot() {}
+
+               public EnumSerializerConfigSnapshot(Class<T> enumClass, T[] 
enumConstants) {
+                       super(enumClass);
+                       this.enumConstants = 
Preconditions.checkNotNull(enumConstants);
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       try (final DataOutputViewStream outViewWrapper = new 
DataOutputViewStream(out)) {
+                               
InstantiationUtil.serializeObject(outViewWrapper, enumConstants);
+                       }
+               }
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
+                               try {
+                                       enumConstants = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+                               } catch (ClassNotFoundException e) {
+                                       throw new IOException("The requested 
enum class cannot be found in classpath.", e);
+                               } catch (IllegalArgumentException e) {
+                                       throw new IOException("A previously 
existing enum constant of "
+                                               + getTypeClass().getName() + " 
no longer exists.", e);
+                               }
+                       }
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               public T[] getEnumConstants() {
+                       return enumConstants;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return super.equals(obj)
+                                       && Arrays.equals(
+                                               enumConstants,
+                                               ((EnumSerializerConfigSnapshot) 
obj).getEnumConstants());
+               }
+
+               @Override
+               public int hashCode() {
+                       return super.hashCode() * 31 + 
Arrays.hashCode(enumConstants);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Test utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       @VisibleForTesting
+       T[] getValues() {
+               return values;
+       }
+
+       @VisibleForTesting
+       Map<T, Integer> getValueToOrdinal() {
+               return valueToOrdinal;
        }
 }

Reply via email to