Repository: flink
Updated Branches:
  refs/heads/master 5363595d5 -> f1ac0f279


http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
new file mode 100644
index 0000000..a3b1109
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class holds the deprecated implementations of readers for state meta 
infos. They can be removed when we drop
+ * backwards compatibility.
+ */
+public class LegacyStateMetaInfoReaders {
+
+       private LegacyStateMetaInfoReaders() {
+       }
+
+       /**
+        * Implementation of {@link StateMetaInfoReader} for version 3 of keyed 
state.
+        * - v3: Flink 1.4.x, 1.5.x
+        */
+       static class KeyedBackendStateMetaInfoReaderV3V4 implements 
StateMetaInfoReader {
+
+               static final KeyedBackendStateMetaInfoReaderV3V4 INSTANCE = new 
KeyedBackendStateMetaInfoReaderV3V4();
+
+               @Nonnull
+               @Override
+               public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+                       @Nonnull DataInputView in, @Nonnull ClassLoader 
userCodeClassLoader) throws IOException {
+
+                       final StateDescriptor.Type stateDescType = 
StateDescriptor.Type.values()[in.readInt()];
+                       final String stateName = in.readUTF();
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersAndConfigs =
+                               
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader);
+
+                       Map<String, String> optionsMap = 
Collections.singletonMap(
+                               
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
+                               stateDescType.toString());
+
+
+                       Map<String, TypeSerializer<?>> serializerMap = new 
HashMap<>(2);
+                       
serializerMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
+                               serializersAndConfigs.get(0).f0);
+                       
serializerMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+                               serializersAndConfigs.get(1).f0);
+
+                       Map<String, TypeSerializerConfigSnapshot> 
serializerConfigSnapshotMap = new HashMap<>(2);
+                       
serializerConfigSnapshotMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
+                               serializersAndConfigs.get(0).f1);
+                       
serializerConfigSnapshotMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+                               serializersAndConfigs.get(1).f1);
+
+                       return new StateMetaInfoSnapshot(
+                               stateName,
+                               
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+                               optionsMap,
+                               serializerConfigSnapshotMap,
+                               serializerMap);
+               }
+       }
+
+       /**
+        * Implementation of {@link StateMetaInfoReader} for version 1 and 2 of 
keyed state.
+        * - v1: Flink 1.2.x
+        * - v2: Flink 1.3.x
+        */
+       static class KeyedBackendStateMetaInfoReaderV1V2 implements 
StateMetaInfoReader {
+
+               static final KeyedBackendStateMetaInfoReaderV1V2 INSTANCE = new 
KeyedBackendStateMetaInfoReaderV1V2();
+
+               @Nonnull
+               @Override
+               public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+                       @Nonnull DataInputView in,
+                       @Nonnull ClassLoader userCodeClassLoader) throws 
IOException {
+
+                       final StateDescriptor.Type stateDescType = 
StateDescriptor.Type.values()[in.readInt()];
+                       final String stateName = in.readUTF();
+
+                       Map<String, String> optionsMap = 
Collections.singletonMap(
+                               
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
+                               stateDescType.toString());
+
+
+                       Map<String, TypeSerializer<?>> serializerMap = new 
HashMap<>(2);
+                       serializerMap.put(
+                               
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
+                               
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, 
true));
+                       serializerMap.put(
+                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+                               
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, 
true));
+                       return new StateMetaInfoSnapshot(
+                               stateName,
+                               
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+                               optionsMap,
+                               Collections.emptyMap(),
+                               serializerMap);
+               }
+       }
+
+       /**
+        * Unified reader for older versions of operator (version 2 and 3) AND 
broadcast state (version 3).
+        * <p>
+        * - v2: Flink 1.3.x, 1.4.x
+        * - v3: Flink 1.5.x
+        */
+       static class OperatorBackendStateMetaInfoReaderV2V3 implements 
StateMetaInfoReader {
+
+               static final OperatorBackendStateMetaInfoReaderV2V3 INSTANCE = 
new OperatorBackendStateMetaInfoReaderV2V3();
+
+               private static final String[] ORDERED_KEY_STRINGS =
+                       new String[]{
+                               
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
+                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()};
+
+               @Nonnull
+               @Override
+               public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+                       @Nonnull DataInputView in,
+                       @Nonnull ClassLoader userCodeClassLoader) throws 
IOException {
+
+                       final String name = in.readUTF();
+                       final OperatorStateHandle.Mode mode = 
OperatorStateHandle.Mode.values()[in.readByte()];
+
+                       Map<String, String> optionsMap = 
Collections.singletonMap(
+                               
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+                               mode.toString());
+
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> stateSerializerAndConfigList =
+                               
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader);
+
+                       final int listSize = 
stateSerializerAndConfigList.size();
+                       StateMetaInfoSnapshot.BackendStateType stateType = 
listSize == 1 ?
+                               StateMetaInfoSnapshot.BackendStateType.OPERATOR 
: StateMetaInfoSnapshot.BackendStateType.BROADCAST;
+                       Map<String, TypeSerializer<?>> serializerMap = new 
HashMap<>(listSize);
+                       Map<String, TypeSerializerConfigSnapshot> 
serializerConfigsMap = new HashMap<>(listSize);
+                       for (int i = 0; i < listSize; ++i) {
+                               Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> serializerAndConf =
+                                       stateSerializerAndConfigList.get(i);
+
+                               // this particular mapping happens to support 
both, V2 and V3
+                               String serializerKey = 
ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i];
+
+                               serializerMap.put(
+                                       serializerKey,
+                                       serializerAndConf.f0);
+                               serializerConfigsMap.put(
+                                       serializerKey,
+                                       serializerAndConf.f1);
+                       }
+
+                       return new StateMetaInfoSnapshot(
+                               name,
+                               stateType,
+                               optionsMap,
+                               serializerConfigsMap,
+                               serializerMap);
+               }
+       }
+
+       /**
+        * Reader for older versions of operator state (version 1).
+        * - v1: Flink 1.2.x
+        */
+       public static class OperatorBackendStateMetaInfoReaderV1 implements 
StateMetaInfoReader {
+
+               static final OperatorBackendStateMetaInfoReaderV1 INSTANCE = 
new OperatorBackendStateMetaInfoReaderV1();
+
+               @Nonnull
+               @Override
+               public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+                       @Nonnull DataInputView in,
+                       @Nonnull ClassLoader userCodeClassLoader) throws 
IOException {
+
+                       final String name = in.readUTF();
+                       final OperatorStateHandle.Mode mode = 
OperatorStateHandle.Mode.values()[in.readByte()];
+                       final Map<String, String> optionsMap = 
Collections.singletonMap(
+                               
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+                               mode.toString());
+
+                       DataInputViewStream dis = new DataInputViewStream(in);
+                       ClassLoader previousClassLoader = 
Thread.currentThread().getContextClassLoader();
+
+                       try (
+                               
InstantiationUtil.FailureTolerantObjectInputStream ois =
+                                       new 
InstantiationUtil.FailureTolerantObjectInputStream(dis, userCodeClassLoader)) {
+                               
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+                               TypeSerializer<?> stateSerializer = 
(TypeSerializer<?>) ois.readObject();
+                               return new StateMetaInfoSnapshot(
+                                       name,
+                                       
StateMetaInfoSnapshot.BackendStateType.OPERATOR,
+                                       optionsMap,
+                                       Collections.emptyMap(),
+                                       Collections.singletonMap(
+                                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+                                               stateSerializer));
+                       } catch (ClassNotFoundException exception) {
+                               throw new IOException(exception);
+                       } finally {
+                               
Thread.currentThread().setContextClassLoader(previousClassLoader);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoReader.java
new file mode 100644
index 0000000..4093ad6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Functional interface to read {@link StateMetaInfoSnapshot}.
+ */
+@FunctionalInterface
+public interface StateMetaInfoReader {
+
+       /**
+        * Reads a snapshot from the given input view.
+        *
+        * @param inputView the input to read from.
+        * @param userCodeClassLoader user classloader to deserialize the 
objects in the snapshot.
+        * @return the deserialized snapshot.
+        * @throws IOException on deserialization problems.
+        */
+       @Nonnull
+       StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+               @Nonnull DataInputView inputView,
+               @Nonnull ClassLoader userCodeClassLoader) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
new file mode 100644
index 0000000..9341a5a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Generalized snapshot for meta information about one state in a state backend
+ * (e.g. {@link 
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo}).
+ */
+public class StateMetaInfoSnapshot {
+
+       /**
+        * Enum that defines the different types of state that live in Flink 
backends.
+        */
+       public enum BackendStateType {
+               KEY_VALUE,
+               OPERATOR,
+               BROADCAST,
+               TIMER
+       }
+
+       /**
+        * Predefined keys for the most common options in the meta info.
+        */
+       public enum CommonOptionsKeys {
+               /** Key to define the {@link StateDescriptor.Type} of a 
key/value keyed-state */
+               KEYED_STATE_TYPE,
+               /**
+                * Key to define {@link 
org.apache.flink.runtime.state.OperatorStateHandle.Mode}, about how operator 
state is
+                * distributed on restore
+                */
+               OPERATOR_STATE_DISTRIBUTION_MODE,
+       }
+
+       /**
+        * Predefined keys for the most common serializer types in the meta 
info.
+        */
+       public enum CommonSerializerKeys {
+               KEY_SERIALIZER,
+               NAMESPACE_SERIALIZER,
+               VALUE_SERIALIZER
+       }
+
+       /** The name of the state. */
+       @Nonnull
+       private final String name;
+
+       @Nonnull
+       private final BackendStateType backendStateType;
+
+       /** Map of options (encoded as strings) for the state. */
+       @Nonnull
+       private final Map<String, String> options;
+
+       /** The configurations of all the type serializers used with the state. 
*/
+       @Nonnull
+       private final Map<String, TypeSerializerConfigSnapshot> 
serializerConfigSnapshots;
+
+       // TODO this will go awy again after FLINK-9377 is merged, that is why 
it is currently duplicated here.
+       /** The serializers used by the state. */
+       @Nonnull
+       private final Map<String, TypeSerializer<?>> serializers;
+
+       public StateMetaInfoSnapshot(
+               @Nonnull String name,
+               @Nonnull BackendStateType backendStateType,
+               @Nonnull Map<String, String> options,
+               @Nonnull Map<String, TypeSerializerConfigSnapshot> 
serializerConfigSnapshots,
+               @Nonnull Map<String, TypeSerializer<?>> serializers) {
+               this.name = name;
+               this.backendStateType = backendStateType;
+               this.options = options;
+               this.serializerConfigSnapshots = serializerConfigSnapshots;
+               this.serializers = serializers;
+       }
+
+       @Nonnull
+       public BackendStateType getBackendStateType() {
+               return backendStateType;
+       }
+
+       @Nullable
+       public TypeSerializerConfigSnapshot 
getTypeSerializerConfigSnapshot(@Nonnull String key) {
+               return serializerConfigSnapshots.get(key);
+       }
+
+       @Nullable
+       public TypeSerializerConfigSnapshot 
getTypeSerializerConfigSnapshot(@Nonnull CommonSerializerKeys key) {
+               return getTypeSerializerConfigSnapshot(key.toString());
+       }
+
+       @Nullable
+       public String getOption(@Nonnull String key) {
+               return options.get(key);
+       }
+
+       @Nullable
+       public String getOption(@Nonnull 
StateMetaInfoSnapshot.CommonOptionsKeys key) {
+               return getOption(key.toString());
+       }
+
+       @Nonnull
+       public Map<String, String> getOptionsImmutable() {
+               return Collections.unmodifiableMap(options);
+       }
+
+       @Nonnull
+       public String getName() {
+               return name;
+       }
+
+       @Nullable
+       public TypeSerializer<?> getTypeSerializer(@Nonnull String key) {
+               return serializers.get(key);
+       }
+
+       @Nullable
+       public TypeSerializer<?> getTypeSerializer(@Nonnull 
CommonSerializerKeys key) {
+               return getTypeSerializer(key.toString());
+       }
+
+       @Nonnull
+       public Map<String, TypeSerializerConfigSnapshot> 
getSerializerConfigSnapshotsImmutable() {
+               return Collections.unmodifiableMap(serializerConfigSnapshots);
+       }
+
+       @Nonnull
+       public Map<String, TypeSerializer<?>> getSerializersImmutable() {
+               return Collections.unmodifiableMap(serializers);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
new file mode 100644
index 0000000..ce535ef
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Static factory that gives out the write and readers for different versions 
of {@link StateMetaInfoSnapshot}.
+ */
+public class StateMetaInfoSnapshotReadersWriters {
+
+       /**
+        * Current version for the serialization format of {@link 
StateMetaInfoSnapshotReadersWriters}.
+        * - v5: Flink 1.6.x
+        */
+       public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
+
+       /**
+        * Enum for backeards compatibility. This gives a hint about the 
expected state type for which a
+        * {@link StateMetaInfoSnapshot} should be deserialized.
+        *
+        * TODO this can go away after we eventually drop backwards 
compatibility with all versions < 5.
+        */
+       public enum StateTypeHint {
+               KEYED_STATE,
+               OPERATOR_STATE
+       }
+
+       /**
+        * Returns the writer for {@link StateMetaInfoSnapshot}.
+        */
+       @Nonnull
+       public static StateMetaInfoWriter getWriter() {
+               return CurrentWriterImpl.INSTANCE;
+       }
+
+       /**
+        * Returns a reader for {@link StateMetaInfoSnapshot} with the 
requested state type and version number.
+        *
+        * @param readVersion the format version to read.
+        * @param stateTypeHint a hint about the expected type to read.
+        * @return the requested reader.
+        */
+       @Nonnull
+       public static StateMetaInfoReader getReader(int readVersion, @Nonnull 
StateTypeHint stateTypeHint) {
+
+               if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+                       // latest version shortcut
+                       return CurrentReaderImpl.INSTANCE;
+               }
+
+               if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+                       throw new IllegalArgumentException("Unsupported read 
version for state meta info: " + readVersion);
+               }
+
+               switch (stateTypeHint) {
+                       case KEYED_STATE:
+                               return 
getLegacyKeyedStateMetaInfoReader(readVersion);
+                       case OPERATOR_STATE:
+                               return 
getLegacyOperatorStateMetaInfoReader(readVersion);
+                       default:
+                               throw new IllegalArgumentException("Unsupported 
state type hint: " + stateTypeHint);
+               }
+       }
+
+       @Nonnull
+       private static StateMetaInfoReader 
getLegacyKeyedStateMetaInfoReader(int readVersion) {
+               switch (readVersion) {
+                       case 1:
+                       case 2:
+                               return 
LegacyStateMetaInfoReaders.KeyedBackendStateMetaInfoReaderV1V2.INSTANCE;
+                       case 3:
+                       case 4:
+                               return 
LegacyStateMetaInfoReaders.KeyedBackendStateMetaInfoReaderV3V4.INSTANCE;
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                       "Unrecognized keyed backend state meta 
info writer version: " + readVersion);
+               }
+       }
+
+       @Nonnull
+       private static StateMetaInfoReader 
getLegacyOperatorStateMetaInfoReader(int readVersion) {
+               switch (readVersion) {
+                       case 1:
+                               return 
LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV1.INSTANCE;
+                       case 2:
+                       case 3:
+                               return 
LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3.INSTANCE;
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                       "Unrecognized operator backend state 
meta info writer version: " + readVersion);
+               }
+       }
+
+       //----------------------------------------------------------
+
+       /**
+        * Implementation of {@link StateMetaInfoWriter}.
+        */
+       static class CurrentWriterImpl implements StateMetaInfoWriter {
+
+               private static final CurrentWriterImpl INSTANCE = new 
CurrentWriterImpl();
+
+               @Override
+               public void writeStateMetaInfoSnapshot(
+                       @Nonnull StateMetaInfoSnapshot snapshot,
+                       @Nonnull DataOutputView outputView) throws IOException {
+                       final Map<String, String> optionsMap = 
snapshot.getOptionsImmutable();
+                       final Map<String, TypeSerializer<?>> serializerMap = 
snapshot.getSerializersImmutable();
+                       final Map<String, TypeSerializerConfigSnapshot> 
serializerConfigSnapshotsMap =
+                               
snapshot.getSerializerConfigSnapshotsImmutable();
+                       Preconditions.checkState(serializerMap.size() == 
serializerConfigSnapshotsMap.size());
+
+                       outputView.writeUTF(snapshot.getName());
+                       
outputView.writeInt(snapshot.getBackendStateType().ordinal());
+                       outputView.writeInt(optionsMap.size());
+                       for (Map.Entry<String, String> entry : 
optionsMap.entrySet()) {
+                               outputView.writeUTF(entry.getKey());
+                               outputView.writeUTF(entry.getValue());
+                       }
+
+                       outputView.writeInt(serializerMap.size());
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersWithConfig =
+                               new ArrayList<>(serializerMap.size());
+
+                       for (Map.Entry<String, TypeSerializer<?>> entry : 
serializerMap.entrySet()) {
+                               final String key = entry.getKey();
+                               outputView.writeUTF(key);
+
+                               TypeSerializerConfigSnapshot 
configForSerializer =
+                                       
Preconditions.checkNotNull(serializerConfigSnapshotsMap.get(key));
+
+                               serializersWithConfig.add(new 
Tuple2<>(entry.getValue(), configForSerializer));
+                       }
+
+                       
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(outputView,
 serializersWithConfig);
+               }
+       }
+
+       /**
+        * Implementation of {@link StateMetaInfoReader} for the current 
version and generic for all state types.
+        */
+       static class CurrentReaderImpl implements StateMetaInfoReader {
+
+               private static final CurrentReaderImpl INSTANCE = new 
CurrentReaderImpl();
+
+               @Nonnull
+               @Override
+               public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+                       @Nonnull DataInputView inputView,
+                       @Nonnull ClassLoader userCodeClassLoader) throws 
IOException {
+
+                       final String stateName = inputView.readUTF();
+                       final StateMetaInfoSnapshot.BackendStateType stateType =
+                               
StateMetaInfoSnapshot.BackendStateType.values()[inputView.readInt()];
+                       final int numOptions = inputView.readInt();
+                       HashMap<String, String> optionsMap = new 
HashMap<>(numOptions);
+                       for (int i = 0; i < numOptions; ++i) {
+                               String key = inputView.readUTF();
+                               String value = inputView.readUTF();
+                               optionsMap.put(key, value);
+                       }
+                       final int numSerializer = inputView.readInt();
+                       final ArrayList<String> serializerKeys = new 
ArrayList<>(numSerializer);
+                       final HashMap<String, TypeSerializer<?>> serializerMap 
= new HashMap<>(numSerializer);
+                       final HashMap<String, TypeSerializerConfigSnapshot> 
serializerConfigsMap = new HashMap<>(numSerializer);
+
+                       for (int i = 0; i < numSerializer; ++i) {
+                               serializerKeys.add(inputView.readUTF());
+                       }
+                       final List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersWithConfig =
+                               
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(inputView,
 userCodeClassLoader);
+
+                       for (int i = 0; i < numSerializer; ++i) {
+                               String key = serializerKeys.get(i);
+                               final Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> serializerConfigTuple =
+                                       serializersWithConfig.get(i);
+                               serializerMap.put(key, 
serializerConfigTuple.f0);
+                               serializerConfigsMap.put(key, 
serializerConfigTuple.f1);
+                       }
+
+                       return new StateMetaInfoSnapshot(stateName, stateType, 
optionsMap, serializerConfigsMap, serializerMap);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoWriter.java
new file mode 100644
index 0000000..5af9e64
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.core.memory.DataOutputView;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Functional interface to write {@link StateMetaInfoSnapshot}.
+ */
+@FunctionalInterface
+public interface StateMetaInfoWriter {
+
+       /**
+        * Writes the given snapshot to the output view.
+        *
+        * @param snapshot the snapshot to write.
+        * @param outputView the output to write into.
+        * @throws IOException on write problems.
+        */
+       void writeStateMetaInfoSnapshot(
+               @Nonnull StateMetaInfoSnapshot snapshot,
+               @Nonnull DataOutputView outputView) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 3f78097..5241dd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -29,6 +29,9 @@ import 
org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 
 import org.junit.Assert;
@@ -39,6 +42,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
+
 public class SerializationProxiesTest {
 
        @Test
@@ -48,7 +53,7 @@ public class SerializationProxiesTest {
                TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoList = new ArrayList<>();
+               List<StateMetaInfoSnapshot> stateMetaInfoList = new 
ArrayList<>();
 
                stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
                        StateDescriptor.Type.VALUE, "a", namespaceSerializer, 
stateSerializer).snapshot());
@@ -73,10 +78,10 @@ public class SerializationProxiesTest {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
-               Assert.assertEquals(true, 
serializationProxy.isUsingKeyGroupCompression());
+               
Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
                Assert.assertEquals(keySerializer, 
serializationProxy.getKeySerializer());
                Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
-               Assert.assertEquals(stateMetaInfoList, 
serializationProxy.getStateMetaInfoSnapshots());
+               assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList, 
serializationProxy.getStateMetaInfoSnapshots());
        }
 
        @Test
@@ -86,7 +91,7 @@ public class SerializationProxiesTest {
                TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoList = new ArrayList<>();
+               List<StateMetaInfoSnapshot> stateMetaInfoList = new 
ArrayList<>();
 
                stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
                        StateDescriptor.Type.VALUE, "a", namespaceSerializer, 
stateSerializer).snapshot());
@@ -126,11 +131,12 @@ public class SerializationProxiesTest {
                Assert.assertTrue(serializationProxy.getKeySerializer() 
instanceof UnloadableDummyTypeSerializer);
                Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
 
-               for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : 
serializationProxy.getStateMetaInfoSnapshots()) {
-                       Assert.assertTrue(meta.getNamespaceSerializer() 
instanceof UnloadableDummyTypeSerializer);
-                       Assert.assertTrue(meta.getStateSerializer() instanceof 
UnloadableDummyTypeSerializer);
-                       
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
meta.getNamespaceSerializerConfigSnapshot());
-                       
Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
meta.getStateSerializerConfigSnapshot());
+               for (StateMetaInfoSnapshot snapshot : 
serializationProxy.getStateMetaInfoSnapshots()) {
+                       final RegisteredKeyedBackendStateMetaInfo<?, ?> 
restoredMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(snapshot);
+                       
Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof 
UnloadableDummyTypeSerializer);
+                       Assert.assertTrue(restoredMetaInfo.getStateSerializer() 
instanceof UnloadableDummyTypeSerializer);
+                       
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
+                       
Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
                }
        }
 
@@ -141,22 +147,21 @@ public class SerializationProxiesTest {
                TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = 
new RegisteredKeyedBackendStateMetaInfo<>(
+               StateMetaInfoSnapshot metaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
                        StateDescriptor.Type.VALUE, name, namespaceSerializer, 
stateSerializer).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       KeyedBackendStateMetaInfoSnapshotReaderWriters
-                               
.getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
-                               .writeStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
-
+                       StateMetaInfoSnapshotReadersWriters.getWriter().
+                               writeStateMetaInfoSnapshot(metaInfo, new 
DataOutputViewStreamWrapper(out));
                        serialized = out.toByteArray();
                }
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       metaInfo = 
KeyedBackendStateMetaInfoSnapshotReaderWriters
-                               
.getReaderForVersion(KeyedBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
-                               .readStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+                       final StateMetaInfoReader reader = 
StateMetaInfoSnapshotReadersWriters.getReader(
+                               CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, 
StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
+                       metaInfo = reader.readStateMetaInfoSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
                Assert.assertEquals(name, metaInfo.getName());
@@ -168,15 +173,13 @@ public class SerializationProxiesTest {
                TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = 
new RegisteredKeyedBackendStateMetaInfo<>(
+               StateMetaInfoSnapshot snapshot = new 
RegisteredKeyedBackendStateMetaInfo<>(
                        StateDescriptor.Type.VALUE, name, namespaceSerializer, 
stateSerializer).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       KeyedBackendStateMetaInfoSnapshotReaderWriters
-                               
.getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
-                               .writeStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
-
+                       StateMetaInfoSnapshotReadersWriters.getWriter().
+                               writeStateMetaInfoSnapshot(snapshot, new 
DataOutputViewStreamWrapper(out));
                        serialized = out.toByteArray();
                }
 
@@ -185,20 +188,23 @@ public class SerializationProxiesTest {
                
cnfThrowingSerializerClasses.add(DoubleSerializer.class.getName());
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       metaInfo = 
KeyedBackendStateMetaInfoSnapshotReaderWriters
-                               .getReaderForVersion(
-                                       KeyedBackendSerializationProxy.VERSION,
-                                       new 
ArtificialCNFExceptionThrowingClassLoader(
-                                               
Thread.currentThread().getContextClassLoader(),
-                                               cnfThrowingSerializerClasses))
-                               .readStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+                       final StateMetaInfoReader reader = 
StateMetaInfoSnapshotReadersWriters.getReader(
+                               CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, 
StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
+                       final ClassLoader classLoader = new 
ArtificialCNFExceptionThrowingClassLoader(
+                               Thread.currentThread().getContextClassLoader(),
+                               cnfThrowingSerializerClasses);
+
+                       snapshot = reader.readStateMetaInfoSnapshot(
+                               new DataInputViewStreamWrapper(in), 
classLoader);
                }
 
-               Assert.assertEquals(name, metaInfo.getName());
-               Assert.assertTrue(metaInfo.getNamespaceSerializer() instanceof 
UnloadableDummyTypeSerializer);
-               Assert.assertTrue(metaInfo.getStateSerializer() instanceof 
UnloadableDummyTypeSerializer);
-               
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
metaInfo.getNamespaceSerializerConfigSnapshot());
-               Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
metaInfo.getStateSerializerConfigSnapshot());
+               RegisteredKeyedBackendStateMetaInfo<?, ?> restoredMetaInfo = 
new RegisteredKeyedBackendStateMetaInfo<>(snapshot);
+
+               Assert.assertEquals(name, restoredMetaInfo.getName());
+               Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() 
instanceof UnloadableDummyTypeSerializer);
+               Assert.assertTrue(restoredMetaInfo.getStateSerializer() 
instanceof UnloadableDummyTypeSerializer);
+               
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
+               Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
        }
 
        @Test
@@ -208,7 +214,7 @@ public class SerializationProxiesTest {
                TypeSerializer<?> keySerializer = DoubleSerializer.INSTANCE;
                TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
 
-               List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
stateMetaInfoSnapshots = new ArrayList<>();
+               List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new 
ArrayList<>();
 
                stateMetaInfoSnapshots.add(new 
RegisteredOperatorBackendStateMetaInfo<>(
                        "a", stateSerializer, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
@@ -217,7 +223,7 @@ public class SerializationProxiesTest {
                stateMetaInfoSnapshots.add(new 
RegisteredOperatorBackendStateMetaInfo<>(
                        "c", stateSerializer, 
OperatorStateHandle.Mode.UNION).snapshot());
 
-               List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> 
broadcastStateMetaInfoSnapshots = new ArrayList<>();
+               List<StateMetaInfoSnapshot> broadcastStateMetaInfoSnapshots = 
new ArrayList<>();
 
                broadcastStateMetaInfoSnapshots.add(new 
RegisteredBroadcastBackendStateMetaInfo<>(
                                "d", OperatorStateHandle.Mode.BROADCAST, 
keySerializer, valueSerializer).snapshot());
@@ -240,8 +246,8 @@ public class SerializationProxiesTest {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
-               Assert.assertEquals(stateMetaInfoSnapshots, 
serializationProxy.getOperatorStateMetaInfoSnapshots());
-               Assert.assertEquals(broadcastStateMetaInfoSnapshots, 
serializationProxy.getBroadcastStateMetaInfoSnapshots());
+               assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoSnapshots, 
serializationProxy.getOperatorStateMetaInfoSnapshots());
+               
assertEqualStateMetaInfoSnapshotsLists(broadcastStateMetaInfoSnapshots, 
serializationProxy.getBroadcastStateMetaInfoSnapshots());
        }
 
        @Test
@@ -250,28 +256,30 @@ public class SerializationProxiesTest {
                String name = "test";
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+               StateMetaInfoSnapshot snapshot =
                        new RegisteredOperatorBackendStateMetaInfo<>(
                                name, stateSerializer, 
OperatorStateHandle.Mode.UNION).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       OperatorBackendStateMetaInfoSnapshotReaderWriters
-                               
.getOperatorStateWriterForVersion(OperatorBackendSerializationProxy.VERSION, 
metaInfo)
-                               .writeOperatorStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
+                       
StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(snapshot,
 new DataOutputViewStreamWrapper(out));
 
                        serialized = out.toByteArray();
                }
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
-                               
.getOperatorStateReaderForVersion(OperatorBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
-                               .readOperatorStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+                       final StateMetaInfoReader reader = 
StateMetaInfoSnapshotReadersWriters.getReader(
+                               CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, 
StateMetaInfoSnapshotReadersWriters.StateTypeHint.OPERATOR_STATE);
+                       snapshot = reader.readStateMetaInfoSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
-               Assert.assertEquals(name, metaInfo.getName());
-               Assert.assertEquals(OperatorStateHandle.Mode.UNION, 
metaInfo.getAssignmentMode());
-               Assert.assertEquals(stateSerializer, 
metaInfo.getPartitionStateSerializer());
+               RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
+                       new RegisteredOperatorBackendStateMetaInfo<>(snapshot);
+
+               Assert.assertEquals(name, restoredMetaInfo.getName());
+               Assert.assertEquals(OperatorStateHandle.Mode.UNION, 
restoredMetaInfo.getAssignmentMode());
+               Assert.assertEquals(stateSerializer, 
restoredMetaInfo.getPartitionStateSerializer());
        }
 
        @Test
@@ -281,29 +289,34 @@ public class SerializationProxiesTest {
                TypeSerializer<?> keySerializer = DoubleSerializer.INSTANCE;
                TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
 
-               RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?> metaInfo 
=
-                               new RegisteredBroadcastBackendStateMetaInfo<>(
-                                               name, 
OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot();
+               StateMetaInfoSnapshot snapshot =
+                       new RegisteredBroadcastBackendStateMetaInfo<>(
+                               name, OperatorStateHandle.Mode.BROADCAST, 
keySerializer, valueSerializer).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       OperatorBackendStateMetaInfoSnapshotReaderWriters
-                                       
.getBroadcastStateWriterForVersion(OperatorBackendSerializationProxy.VERSION, 
metaInfo)
-                                       .writeBroadcastStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
+                       StateMetaInfoSnapshotReadersWriters.getWriter().
+                               writeStateMetaInfoSnapshot(snapshot, new 
DataOutputViewStreamWrapper(out));
 
                        serialized = out.toByteArray();
                }
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
-                                       
.getBroadcastStateReaderForVersion(OperatorBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
-                                       .readBroadcastStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+                       final StateMetaInfoReader reader = 
StateMetaInfoSnapshotReadersWriters.getReader(
+                               CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, 
StateMetaInfoSnapshotReadersWriters.StateTypeHint.OPERATOR_STATE);
+                       snapshot = reader.readStateMetaInfoSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
 
-               Assert.assertEquals(name, metaInfo.getName());
-               Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST, 
metaInfo.getAssignmentMode());
-               Assert.assertEquals(keySerializer, metaInfo.getKeySerializer());
-               Assert.assertEquals(valueSerializer, 
metaInfo.getValueSerializer());
+               RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
+                       new RegisteredBroadcastBackendStateMetaInfo<>(snapshot);
+
+               Assert.assertEquals(name, restoredMetaInfo.getName());
+               Assert.assertEquals(
+                       OperatorStateHandle.Mode.BROADCAST,
+                       restoredMetaInfo.getAssignmentMode());
+               Assert.assertEquals(keySerializer, 
restoredMetaInfo.getKeySerializer());
+               Assert.assertEquals(valueSerializer, 
restoredMetaInfo.getValueSerializer());
        }
 
        @Test
@@ -311,16 +324,14 @@ public class SerializationProxiesTest {
                String name = "test";
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-               RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+               StateMetaInfoSnapshot snapshot =
                        new RegisteredOperatorBackendStateMetaInfo<>(
                                name, stateSerializer, 
OperatorStateHandle.Mode.UNION).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       OperatorBackendStateMetaInfoSnapshotReaderWriters
-                               
.getOperatorStateWriterForVersion(OperatorBackendSerializationProxy.VERSION, 
metaInfo)
-                               .writeOperatorStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
-
+                       StateMetaInfoSnapshotReadersWriters.getWriter().
+                               writeStateMetaInfoSnapshot(snapshot, new 
DataOutputViewStreamWrapper(out));
                        serialized = out.toByteArray();
                }
 
@@ -329,18 +340,22 @@ public class SerializationProxiesTest {
                
cnfThrowingSerializerClasses.add(StringSerializer.class.getName());
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
-                               .getOperatorStateReaderForVersion(
-                                       
OperatorBackendSerializationProxy.VERSION,
-                                       new 
ArtificialCNFExceptionThrowingClassLoader(
-                                               
Thread.currentThread().getContextClassLoader(),
-                                               cnfThrowingSerializerClasses))
-                               .readOperatorStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+                       final StateMetaInfoReader reader = 
StateMetaInfoSnapshotReadersWriters.getReader(
+                               CURRENT_STATE_META_INFO_SNAPSHOT_VERSION, 
StateMetaInfoSnapshotReadersWriters.StateTypeHint.OPERATOR_STATE);
+                       final ClassLoader classLoader = new 
ArtificialCNFExceptionThrowingClassLoader(
+                               Thread.currentThread().getContextClassLoader(),
+                               cnfThrowingSerializerClasses);
+                       snapshot = reader.readStateMetaInfoSnapshot(new 
DataInputViewStreamWrapper(in), classLoader);
                }
 
-               Assert.assertEquals(name, metaInfo.getName());
-               Assert.assertTrue(metaInfo.getPartitionStateSerializer() 
instanceof UnloadableDummyTypeSerializer);
-               Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
metaInfo.getPartitionStateSerializerConfigSnapshot());
+               RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
+                       new RegisteredOperatorBackendStateMetaInfo<>(snapshot);
+
+               Assert.assertEquals(name, restoredMetaInfo.getName());
+               
Assert.assertTrue(restoredMetaInfo.getPartitionStateSerializer() instanceof 
UnloadableDummyTypeSerializer);
+               Assert.assertEquals(
+                       stateSerializer.snapshotConfiguration(),
+                       
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
        }
 
        @Test
@@ -349,15 +364,14 @@ public class SerializationProxiesTest {
                TypeSerializer<?> keySerializer = DoubleSerializer.INSTANCE;
                TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
 
-               RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?> 
broadcastMetaInfo =
-                               new RegisteredBroadcastBackendStateMetaInfo<>(
-                                               broadcastName, 
OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot();
+               StateMetaInfoSnapshot snapshot =
+                       new RegisteredBroadcastBackendStateMetaInfo<>(
+                               broadcastName, 
OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot();
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
-                       OperatorBackendStateMetaInfoSnapshotReaderWriters
-                                       
.getBroadcastStateWriterForVersion(OperatorBackendSerializationProxy.VERSION, 
broadcastMetaInfo)
-                                       .writeBroadcastStateMetaInfo(new 
DataOutputViewStreamWrapper(out));
+                       StateMetaInfoSnapshotReadersWriters.getWriter().
+                               writeStateMetaInfoSnapshot(snapshot, new 
DataOutputViewStreamWrapper(out));
 
                        serialized = out.toByteArray();
                }
@@ -367,20 +381,27 @@ public class SerializationProxiesTest {
                
cnfThrowingSerializerClasses.add(StringSerializer.class.getName());
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
-                       broadcastMetaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
-                                       .getBroadcastStateReaderForVersion(
-                                               
OperatorBackendSerializationProxy.VERSION,
-                                               new 
ArtificialCNFExceptionThrowingClassLoader(
-                                                       
Thread.currentThread().getContextClassLoader(),
-                                                       
cnfThrowingSerializerClasses))
-                                       .readBroadcastStateMetaInfo(new 
DataInputViewStreamWrapper(in));
+                       final StateMetaInfoReader reader =
+                               StateMetaInfoSnapshotReadersWriters.getReader(
+                                       
CURRENT_STATE_META_INFO_SNAPSHOT_VERSION,
+                                       
StateMetaInfoSnapshotReadersWriters.StateTypeHint.OPERATOR_STATE);
+
+                       final ClassLoader classLoader = new 
ArtificialCNFExceptionThrowingClassLoader(
+                               Thread.currentThread().getContextClassLoader(),
+                               cnfThrowingSerializerClasses);
+
+                       snapshot = reader.readStateMetaInfoSnapshot(new 
DataInputViewStreamWrapper(in), classLoader);
                }
 
-               Assert.assertEquals(broadcastName, broadcastMetaInfo.getName());
-               Assert.assertTrue(broadcastMetaInfo.getKeySerializer() 
instanceof UnloadableDummyTypeSerializer);
-               Assert.assertEquals(keySerializer.snapshotConfiguration(), 
broadcastMetaInfo.getKeySerializerConfigSnapshot());
-               Assert.assertTrue(broadcastMetaInfo.getValueSerializer() 
instanceof UnloadableDummyTypeSerializer);
-               Assert.assertEquals(valueSerializer.snapshotConfiguration(), 
broadcastMetaInfo.getValueSerializerConfigSnapshot());
+               RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
+                       new RegisteredBroadcastBackendStateMetaInfo<>(snapshot);
+
+               Assert.assertEquals(broadcastName, restoredMetaInfo.getName());
+               Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST, 
restoredMetaInfo.getAssignmentMode());
+               Assert.assertTrue(restoredMetaInfo.getKeySerializer() 
instanceof UnloadableDummyTypeSerializer);
+               Assert.assertEquals(keySerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
+               Assert.assertTrue(restoredMetaInfo.getValueSerializer() 
instanceof UnloadableDummyTypeSerializer);
+               Assert.assertEquals(valueSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
        }
 
        /**
@@ -400,4 +421,23 @@ public class SerializationProxiesTest {
                Assert.assertEquals(5, 
StateDescriptor.Type.AGGREGATING.ordinal());
                Assert.assertEquals(6, StateDescriptor.Type.MAP.ordinal());
        }
+
+       private void assertEqualStateMetaInfoSnapshotsLists(
+               List<StateMetaInfoSnapshot> expected,
+               List<StateMetaInfoSnapshot> actual) {
+               Assert.assertEquals(expected.size(), actual.size());
+               for (int i = 0; i < expected.size(); ++i) {
+                       assertEqualStateMetaInfoSnapshots(expected.get(i), 
actual.get(i));
+               }
+       }
+
+       private void assertEqualStateMetaInfoSnapshots(StateMetaInfoSnapshot 
expected, StateMetaInfoSnapshot actual) {
+               Assert.assertEquals(expected.getName(), actual.getName());
+               Assert.assertEquals(expected.getBackendStateType(), 
actual.getBackendStateType());
+               Assert.assertEquals(expected.getOptionsImmutable(), 
actual.getOptionsImmutable());
+               Assert.assertEquals(expected.getSerializersImmutable(), 
actual.getSerializersImmutable());
+               Assert.assertEquals(
+                       expected.getSerializerConfigSnapshotsImmutable(),
+                       actual.getSerializerConfigSnapshotsImmutable());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 4f36d62..cf6bcc8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -54,16 +54,16 @@ public class CopyOnWriteStateTableTest extends TestLogger {
        @Test
        public void testPutGetRemoveContainsTransform() throws Exception {
                RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
-                               new RegisteredKeyedBackendStateMetaInfo<>(
-                                               StateDescriptor.Type.UNKNOWN,
-                                               "test",
-                                               IntSerializer.INSTANCE,
-                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+                       new RegisteredKeyedBackendStateMetaInfo<>(
+                               StateDescriptor.Type.UNKNOWN,
+                               "test",
+                               IntSerializer.INSTANCE,
+                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
 
                final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
 
                final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
-                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+                       new CopyOnWriteStateTable<>(keyContext, metaInfo);
 
                ArrayList<Integer> state_1_1 = new ArrayList<>();
                state_1_1.add(41);
@@ -106,13 +106,13 @@ public class CopyOnWriteStateTableTest extends TestLogger 
{
                Assert.assertEquals(1, stateTable.size());
 
                StateTransformationFunction<ArrayList<Integer>, Integer> 
function =
-                               new 
StateTransformationFunction<ArrayList<Integer>, Integer>() {
-                                       @Override
-                                       public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
-                                               previousState.add(value);
-                                               return previousState;
-                                       }
-                               };
+                       new StateTransformationFunction<ArrayList<Integer>, 
Integer>() {
+                               @Override
+                               public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+                                       previousState.add(value);
+                                       return previousState;
+                               }
+                       };
 
                final int value = 4711;
                stateTable.transform(1, 1, value, function);
@@ -126,16 +126,16 @@ public class CopyOnWriteStateTableTest extends TestLogger 
{
        @Test
        public void testIncrementalRehash() {
                RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
-                               new RegisteredKeyedBackendStateMetaInfo<>(
-                                               StateDescriptor.Type.UNKNOWN,
-                                               "test",
-                                               IntSerializer.INSTANCE,
-                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+                       new RegisteredKeyedBackendStateMetaInfo<>(
+                               StateDescriptor.Type.UNKNOWN,
+                               "test",
+                               IntSerializer.INSTANCE,
+                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
 
                final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
 
                final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
-                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+                       new CopyOnWriteStateTable<>(keyContext, metaInfo);
 
                int insert = 0;
                int remove = 0;
@@ -171,16 +171,16 @@ public class CopyOnWriteStateTableTest extends TestLogger 
{
        public void testRandomModificationsAndCopyOnWriteIsolation() throws 
Exception {
 
                final RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
-                               new RegisteredKeyedBackendStateMetaInfo<>(
-                                               StateDescriptor.Type.UNKNOWN,
-                                               "test",
-                                               IntSerializer.INSTANCE,
-                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+                       new RegisteredKeyedBackendStateMetaInfo<>(
+                               StateDescriptor.Type.UNKNOWN,
+                               "test",
+                               IntSerializer.INSTANCE,
+                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
 
                final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
 
                final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
-                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+                       new CopyOnWriteStateTable<>(keyContext, metaInfo);
 
                final HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> 
referenceMap = new HashMap<>();
 
@@ -200,17 +200,17 @@ public class CopyOnWriteStateTableTest extends TestLogger 
{
                int referencedSnapshotId = 0;
 
                final StateTransformationFunction<ArrayList<Integer>, Integer> 
transformationFunction =
-                               new 
StateTransformationFunction<ArrayList<Integer>, Integer>() {
-                                       @Override
-                                       public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
-                                               if (previousState == null) {
-                                                       previousState = new 
ArrayList<>();
-                                               }
-                                               previousState.add(value);
-                                               // we give back the original, 
attempting to spot errors in to copy-on-write
-                                               return previousState;
+                       new StateTransformationFunction<ArrayList<Integer>, 
Integer>() {
+                               @Override
+                               public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+                                       if (previousState == null) {
+                                               previousState = new 
ArrayList<>();
                                        }
-                               };
+                                       previousState.add(value);
+                                       // we give back the original, 
attempting to spot errors in to copy-on-write
+                                       return previousState;
+                               }
+                       };
 
                // the main loop for modifications
                for (int i = 0; i < 10_000_000; ++i) {
@@ -261,7 +261,7 @@ public class CopyOnWriteStateTableTest extends TestLogger {
                                        final int updateValue = 
random.nextInt(1000);
                                        stateTable.transform(key, namespace, 
updateValue, transformationFunction);
                                        referenceMap.put(compositeKey, 
transformationFunction.apply(
-                                                       
referenceMap.remove(compositeKey), updateValue));
+                                               
referenceMap.remove(compositeKey), updateValue));
                                        break;
                                }
                                default: {
@@ -326,16 +326,16 @@ public class CopyOnWriteStateTableTest extends TestLogger 
{
        @Test
        public void testCopyOnWriteContracts() {
                RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
-                               new RegisteredKeyedBackendStateMetaInfo<>(
-                                               StateDescriptor.Type.UNKNOWN,
-                                               "test",
-                                               IntSerializer.INSTANCE,
-                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+                       new RegisteredKeyedBackendStateMetaInfo<>(
+                               StateDescriptor.Type.UNKNOWN,
+                               "test",
+                               IntSerializer.INSTANCE,
+                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
 
                final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
 
                final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
-                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+                       new CopyOnWriteStateTable<>(keyContext, metaInfo);
 
                ArrayList<Integer> originalState1 = new ArrayList<>(1);
                ArrayList<Integer> originalState2 = new ArrayList<>(1);
@@ -477,8 +477,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 
        @SuppressWarnings("unchecked")
        private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(
-                       HashMap<Tuple2<Integer, Integer>,
-                                       ArrayList<Integer>> map) {
+               HashMap<Tuple2<Integer, Integer>,
+                       ArrayList<Integer>> map) {
 
                Tuple3<Integer, Integer, ArrayList<Integer>>[] result = new 
Tuple3[map.size()];
                int pos = 0;
@@ -491,8 +491,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
        }
 
        private void deepCheck(
-                       Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
-                       Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
+               Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
+               Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
 
                if (a == b) {
                        return;
@@ -501,14 +501,14 @@ public class CopyOnWriteStateTableTest extends TestLogger 
{
                Assert.assertEquals(a.length, b.length);
 
                Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> 
comparator =
-                               new Comparator<Tuple3<Integer, Integer, 
ArrayList<Integer>>>() {
+                       new Comparator<Tuple3<Integer, Integer, 
ArrayList<Integer>>>() {
 
-                                       @Override
-                                       public int compare(Tuple3<Integer, 
Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> 
o2) {
-                                               int namespaceDiff = o1.f1 - 
o2.f1;
-                                               return namespaceDiff != 0 ? 
namespaceDiff : o1.f0 - o2.f0;
-                                       }
-                               };
+                               @Override
+                               public int compare(Tuple3<Integer, Integer, 
ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> o2) {
+                                       int namespaceDiff = o1.f1 - o2.f1;
+                                       return namespaceDiff != 0 ? 
namespaceDiff : o1.f0 - o2.f0;
+                               }
+                       };
 
                Arrays.sort(a, comparator);
                Arrays.sort(b, comparator);

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
index 0c8e8fe..45c86c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -47,17 +47,17 @@ public class StateTableSnapshotCompatibilityTest {
        public void checkCompatibleSerializationFormats() throws IOException {
                final Random r = new Random(42);
                RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
-                               new RegisteredKeyedBackendStateMetaInfo<>(
-                                               StateDescriptor.Type.UNKNOWN,
-                                               "test",
-                                               IntSerializer.INSTANCE,
-                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE));
+                       new RegisteredKeyedBackendStateMetaInfo<>(
+                               StateDescriptor.Type.UNKNOWN,
+                               "test",
+                               IntSerializer.INSTANCE,
+                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE));
 
                final CopyOnWriteStateTableTest.MockInternalKeyContext<Integer> 
keyContext =
-                               new 
CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE);
+                       new 
CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE);
 
                CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> 
cowStateTable =
-                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+                       new CopyOnWriteStateTable<>(keyContext, metaInfo);
 
                for (int i = 0; i < 100; ++i) {
                        ArrayList<Integer> list = new ArrayList<>(5);
@@ -72,7 +72,7 @@ public class StateTableSnapshotCompatibilityTest {
                StateSnapshot snapshot = cowStateTable.createSnapshot();
 
                final NestedMapsStateTable<Integer, Integer, 
ArrayList<Integer>> nestedMapsStateTable =
-                               new NestedMapsStateTable<>(keyContext, 
metaInfo);
+                       new NestedMapsStateTable<>(keyContext, metaInfo);
 
                restoreStateTableFromSnapshot(nestedMapsStateTable, snapshot, 
keyContext.getKeyGroupRange());
                snapshot.release();
@@ -96,9 +96,9 @@ public class StateTableSnapshotCompatibilityTest {
        }
 
        private static <K, N, S> void restoreStateTableFromSnapshot(
-                       StateTable<K, N, S> stateTable,
-                       StateSnapshot snapshot,
-                       KeyGroupRange keyGroupRange) throws IOException {
+               StateTable<K, N, S> stateTable,
+               StateSnapshot snapshot,
+               KeyGroupRange keyGroupRange) throws IOException {
 
                final ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos(1024 * 1024);
                final DataOutputViewStreamWrapper dov = new 
DataOutputViewStreamWrapper(out);
@@ -111,7 +111,7 @@ public class StateTableSnapshotCompatibilityTest {
                final DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(in);
 
                final StateTableByKeyGroupReader keyGroupReader =
-                               
StateTableByKeyGroupReaders.readerForVersion(stateTable, 
KeyedBackendSerializationProxy.VERSION);
+                       
StateTableByKeyGroupReaders.readerForVersion(stateTable, 
KeyedBackendSerializationProxy.VERSION);
 
                for (Integer keyGroup : keyGroupRange) {
                        keyGroupReader.readMappingsInKeyGroup(div, keyGroup);

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
new file mode 100644
index 0000000..409c796
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.metainfo;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test fixes the enum constants in {@link StateMetaInfoSnapshot} because 
any changes can break backwards
+ * compatibility. Consider this before changing this test.
+ */
+public class StateMetaInfoSnapshotEnumConstantsTest {
+
+       @Test
+       public void testFixedBackendStateTypeEnumConstants() {
+               Assert.assertEquals(4, 
StateMetaInfoSnapshot.BackendStateType.values().length);
+               Assert.assertEquals(0, 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.ordinal());
+               Assert.assertEquals(1, 
StateMetaInfoSnapshot.BackendStateType.OPERATOR.ordinal());
+               Assert.assertEquals(2, 
StateMetaInfoSnapshot.BackendStateType.BROADCAST.ordinal());
+               Assert.assertEquals(3, 
StateMetaInfoSnapshot.BackendStateType.TIMER.ordinal());
+               Assert.assertEquals("KEY_VALUE", 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.toString());
+               Assert.assertEquals("OPERATOR", 
StateMetaInfoSnapshot.BackendStateType.OPERATOR.toString());
+               Assert.assertEquals("BROADCAST", 
StateMetaInfoSnapshot.BackendStateType.BROADCAST.toString());
+               Assert.assertEquals("TIMER", 
StateMetaInfoSnapshot.BackendStateType.TIMER.toString());
+       }
+
+       @Test
+       public void testFixedOptionsEnumConstants() {
+               Assert.assertEquals(2, 
StateMetaInfoSnapshot.CommonOptionsKeys.values().length);
+               Assert.assertEquals(0, 
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.ordinal());
+               Assert.assertEquals(1, 
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.ordinal());
+               Assert.assertEquals("KEYED_STATE_TYPE", 
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString());
+               Assert.assertEquals("OPERATOR_STATE_DISTRIBUTION_MODE", 
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString());
+       }
+
+       @Test
+       public void testFixedSerializerEnumConstants() {
+               Assert.assertEquals(3, 
StateMetaInfoSnapshot.CommonSerializerKeys.values().length);
+               Assert.assertEquals(0, 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.ordinal());
+               Assert.assertEquals(1, 
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.ordinal());
+               Assert.assertEquals(2, 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.ordinal());
+               Assert.assertEquals("KEY_SERIALIZER", 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString());
+               Assert.assertEquals("NAMESPACE_SERIALIZER", 
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString());
+               Assert.assertEquals("VALUE_SERIALIZER", 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index d814c26..cf7974f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -58,12 +58,13 @@ class RocksDBFoldingState<K, N, T, ACC>
         * @param foldFunction The fold function used for folding state.
         * @param backend The backend for which this state is bind to.
         */
-       private RocksDBFoldingState(ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<ACC> valueSerializer,
-                       ACC defaultValue,
-                       FoldFunction<T, ACC> foldFunction,
-                       RocksDBKeyedStateBackend<K> backend) {
+       private RocksDBFoldingState(
+               ColumnFamilyHandle columnFamily,
+               TypeSerializer<N> namespaceSerializer,
+               TypeSerializer<ACC> valueSerializer,
+               ACC defaultValue,
+               FoldFunction<T, ACC> foldFunction,
+               RocksDBKeyedStateBackend<K> backend) {
 
                super(columnFamily, namespaceSerializer, valueSerializer, 
defaultValue, backend);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ac0f27/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f2430ae..ad26b9f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -87,6 +87,7 @@ import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
 import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -230,7 +231,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * <p>TODO this map can be removed when eager-state registration is in 
place.
         * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
         */
-       private final Map<String, 
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
+       private final Map<String, StateMetaInfoSnapshot> 
restoredKvStateMetaInfos;
 
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
@@ -337,6 +338,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
+       @SuppressWarnings("unchecked")
        @Override
        public <N> Stream<K> getKeys(String state, N namespace) {
                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = 
kvStateInformation.get(state);
@@ -668,11 +670,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
                                SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
 
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
+                       List<StateMetaInfoSnapshot> restoredMetaInfos =
                                serializationProxy.getStateMetaInfoSnapshots();
                        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
 
-                       for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
+                       for (StateMetaInfoSnapshot restoredMetaInfo : 
restoredMetaInfos) {
 
                                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
                                        
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
@@ -685,11 +687,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                
rocksDBKeyedStateBackend.columnOptions);
 
                                        RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
-                                               new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                                       
restoredMetaInfo.getStateType(),
-                                                       
restoredMetaInfo.getName(),
-                                                       
restoredMetaInfo.getNamespaceSerializer(),
-                                                       
restoredMetaInfo.getStateSerializer());
+                                               new 
RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
 
                                        
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
 
@@ -796,7 +794,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) 
throws Exception {
 
                        IncrementalLocalKeyedStateHandle localKeyedStateHandle;
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots;
+                       List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
                        List<ColumnFamilyDescriptor> columnFamilyDescriptors;
 
                        // Recovery from remote incremental state.
@@ -930,13 +928,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        private final List<ColumnFamilyDescriptor> 
columnFamilyDescriptors;
 
                        @Nonnull
-                       private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
+                       private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
 
                        RestoredDBInstance(
                                @Nonnull RocksDB db,
                                @Nonnull List<ColumnFamilyHandle> 
columnFamilyHandles,
                                @Nonnull List<ColumnFamilyDescriptor> 
columnFamilyDescriptors,
-                               @Nonnull 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoSnapshots) {
+                               @Nonnull List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots) {
                                this.db = db;
                                this.columnFamilyHandles = columnFamilyHandles;
                                this.defaultColumnFamilyHandle = 
this.columnFamilyHandles.remove(0);
@@ -964,7 +962,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        transferAllStateDataToDirectory(restoreStateHandle, 
temporaryRestoreInstancePath);
 
                        // read meta data
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots =
+                       List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                                
readMetaData(restoreStateHandle.getMetaStateHandle());
 
                        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
@@ -984,18 +982,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(
                        ColumnFamilyDescriptor columnFamilyDescriptor,
                        ColumnFamilyHandle columnFamilyHandle,
-                       RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
stateMetaInfoSnapshot) throws RocksDBException {
+                       StateMetaInfoSnapshot stateMetaInfoSnapshot) throws 
RocksDBException {
 
                        Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
                                
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
                        if (null == registeredStateMetaInfoEntry) {
                                RegisteredKeyedBackendStateMetaInfo<?, ?> 
stateMetaInfo =
-                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                               
stateMetaInfoSnapshot.getStateType(),
-                                               stateMetaInfoSnapshot.getName(),
-                                               
stateMetaInfoSnapshot.getNamespaceSerializer(),
-                                               
stateMetaInfoSnapshot.getStateSerializer());
+                                       new 
RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
 
                                registeredStateMetaInfoEntry =
                                        new Tuple2<>(
@@ -1071,12 +1065,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * This method recreates and registers all {@link 
ColumnFamilyDescriptor} from Flink's state meta data snapshot.
                 */
                private List<ColumnFamilyDescriptor> 
createAndRegisterColumnFamilyDescriptors(
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
+                       List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
 
                        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                                new ArrayList<>(stateMetaInfoSnapshots.size());
 
-                       for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
+                       for (StateMetaInfoSnapshot stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
 
                                ColumnFamilyDescriptor columnFamilyDescriptor = 
new ColumnFamilyDescriptor(
                                        
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
@@ -1094,7 +1088,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private void restoreLocalStateIntoFullInstance(
                        IncrementalLocalKeyedStateHandle restoreStateHandle,
                        List<ColumnFamilyDescriptor> columnFamilyDescriptors,
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) throws Exception {
+                       List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) 
throws Exception {
                        // pick up again the old backend id, so the we can 
reference existing state
                        stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
 
@@ -1120,15 +1114,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(0);
 
                        for (int i = 0; i < columnFamilyDescriptors.size(); 
++i) {
-                               RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
+                               StateMetaInfoSnapshot stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
 
                                ColumnFamilyHandle columnFamilyHandle = 
columnFamilyHandles.get(i);
                                RegisteredKeyedBackendStateMetaInfo<?, ?> 
stateMetaInfo =
-                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                               
stateMetaInfoSnapshot.getStateType(),
-                                               stateMetaInfoSnapshot.getName(),
-                                               
stateMetaInfoSnapshot.getNamespaceSerializer(),
-                                               
stateMetaInfoSnapshot.getStateSerializer());
+                                       new 
RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
 
                                stateBackend.kvStateInformation.put(
                                        stateMetaInfoSnapshot.getName(),
@@ -1177,7 +1167,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                /**
                 * Reads Flink's state meta data file from the state handle.
                 */
-               private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
+               private List<StateMetaInfoSnapshot> readMetaData(
                        StreamStateHandle metaStateHandle) throws Exception {
 
                        FSDataInputStream inputStream = null;
@@ -1299,7 +1289,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         */
        private <N, S> Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
                        StateDescriptor<?, S> stateDesc,
-                       TypeSerializer<N> namespaceSerializer) throws 
StateMigrationException, IOException {
+                       TypeSerializer<N> namespaceSerializer) throws 
StateMigrationException {
 
                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
                        kvStateInformation.get(stateDesc.getName());
@@ -1308,8 +1298,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                if (stateInfo != null) {
 
                        @SuppressWarnings("unchecked")
-                       RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfoSnapshot =
-                               
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(stateDesc.getName());
+                       StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
 
                        Preconditions.checkState(
                                restoredMetaInfoSnapshot != null,
@@ -1946,7 +1935,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                /**
                 * The state meta data.
                 */
-               private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots;
+               private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
                /**
                 * The copied column handle.
@@ -2291,7 +2280,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private Set<StateHandleID> baseSstFiles;
 
                /** The state meta data. */
-               private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
+               private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots = new ArrayList<>();
 
                /** Local directory for the RocksDB native backup. */
                private SnapshotDirectory localBackupDirectory;

Reply via email to