http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index faa9314..22bb715 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Joiner;
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -34,11 +36,15 @@ import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -61,9 +67,11 @@ import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -90,6 +98,8 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import org.junit.rules.ExpectedException;
+
 
 /**
  * Generic tests for the partitioned state part of {@link 
AbstractStateBackend}.
@@ -97,6 +107,9 @@ import static org.mockito.Mockito.verify;
 @SuppressWarnings("serial")
 public abstract class StateBackendTestBase<B extends AbstractStateBackend> 
extends TestLogger {
 
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
        protected abstract B getStateBackend() throws Exception;
 
        protected CheckpointStreamFactory createStreamFactory() throws 
Exception {
@@ -171,21 +184,478 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
        @Test
        @SuppressWarnings("unchecked")
+       public void testBackendUsesRegisteredKryoDefaultSerializer() throws 
Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               // cast because our test serializer is not typed to TestPojo
+               
env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) 
ExceptionThrowingTestSerializer.class);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+
+               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // we will be expecting ExpectedKryoTestException to be thrown,
+               // because the ExceptionThrowingTestSerializer should be used
+               int numExceptions = 0;
+
+               backend.setCurrentKey(1);
+
+               try {
+                       // backends that eagerly serializes (such as RocksDB) 
will fail here
+                       state.update(new TestPojo("u1", 1));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               try {
+                       // backends that lazily serializes (such as memory 
state backend) will fail here
+                       runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void 
testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws 
Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               // cast because our test serializer is not typed to TestPojo
+               env.getExecutionConfig()
+                               .addDefaultKryoSerializer(TestPojo.class, 
(Class) ExceptionThrowingTestSerializer.class);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               pojoType.createSerializer(env.getExecutionConfig());
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+
+               ValueState<TestPojo> state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+               assertTrue(state instanceof InternalValueState);
+               ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+               // we will be expecting ExpectedKryoTestException to be thrown,
+               // because the ExceptionThrowingTestSerializer should be used
+               int numExceptions = 0;
+
+               backend.setCurrentKey(1);
+
+               try {
+                       // backends that eagerly serializes (such as RocksDB) 
will fail here
+                       state.update(new TestPojo("u1", 1));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               try {
+                       // backends that lazily serializes (such as memory 
state backend) will fail here
+                       runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+       }
+
+       @Test
+       public void testBackendUsesRegisteredKryoSerializer() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               env.getExecutionConfig()
+                               .registerTypeWithKryoSerializer(TestPojo.class, 
ExceptionThrowingTestSerializer.class);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+
+               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // we will be expecting ExpectedKryoTestException to be thrown,
+               // because the ExceptionThrowingTestSerializer should be used
+               int numExceptions = 0;
+
+               backend.setCurrentKey(1);
+
+               try {
+                       // backends that eagerly serializes (such as RocksDB) 
will fail here
+                       state.update(new TestPojo("u1", 1));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               try {
+                       // backends that lazily serializes (such as memory 
state backend) will fail here
+                       runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() 
throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
ExceptionThrowingTestSerializer.class);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+
+               ValueState<TestPojo> state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+               assertTrue(state instanceof InternalValueState);
+               ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+               // we will be expecting ExpectedKryoTestException to be thrown,
+               // because the ExceptionThrowingTestSerializer should be used
+               int numExceptions = 0;
+
+               backend.setCurrentKey(1);
+
+               try {
+                       // backends that eagerly serializes (such as RocksDB) 
will fail here
+                       state.update(new TestPojo("u1", 1));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               try {
+                       // backends that lazily serializes (such as memory 
state backend) will fail here
+                       runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forFullCheckpoint()));
+               } catch (ExpectedKryoTestException e) {
+                       numExceptions++;
+               } catch (Exception e) {
+                       if (e.getCause() instanceof ExpectedKryoTestException) {
+                               numExceptions++;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+       }
+
+
+       /**
+        * Verify state restore resilience when:
+        *  - snapshot was taken without any Kryo registrations, specific 
serializers or default serializers for the state type
+        *  - restored with the state type registered (no specific serializer)
+        *
+        * This test should not fail, because de- / serialization of the state 
should noth be performed with Kryo's default
+        * {@link com.esotericsoftware.kryo.serializers.FieldSerializer}.
+        */
+       @Test
+       public void testKryoRegisteringRestoreResilienceWithRegisteredType() 
throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+
+               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // ============== create snapshot - no Kryo registration or 
specific / default serializers ==============
+
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.update(new TestPojo("u1", 1));
+
+               backend.setCurrentKey(2);
+               state.update(new TestPojo("u2", 2));
+
+               KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+                               682375462378L,
+                               2,
+                               streamFactory,
+                               CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ====================================== restore snapshot  
======================================
+
+               env.getExecutionConfig().registerKryoType(TestPojo.class);
+               
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+
+               snapshot.discardState();
+
+               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               backend.setCurrentKey(1);
+               assertEquals(state.value(), new TestPojo("u1", 1));
+
+               backend.setCurrentKey(2);
+               assertEquals(state.value(), new TestPojo("u2", 2));
+
+               backend.dispose();
+       }
+
+       /**
+        * Verify state restore resilience when:
+        *  - snapshot was taken without any Kryo registrations, specific 
serializers or default serializers for the state type
+        *  - restored with a default serializer for the state type
+        *
+        * <p> The default serializer used on restore is {@link 
CustomKryoTestSerializer}, which deliberately
+        * fails only on deserialization. We use the deliberate deserialization 
failure to acknowledge test success.
+        *
+        * @throws Exception expects {@link ExpectedKryoTestException} to be 
thrown.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() 
throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+
+               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // ============== create snapshot - no Kryo registration or 
specific / default serializers ==============
+
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.update(new TestPojo("u1", 1));
+
+               backend.setCurrentKey(2);
+               state.update(new TestPojo("u2", 2));
+
+               KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+                               682375462378L,
+                               2,
+                               streamFactory,
+                               CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ========== restore snapshot - should use default serializer 
(ONLY SERIALIZATION) ==========
+
+               // cast because our test serializer is not typed to TestPojo
+               
env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
+
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+
+               snapshot.discardState();
+
+               // re-initialize to ensure that we create the KryoSerializer 
from scratch, otherwise
+               // initializeSerializerUnlessSet would not pick up our new 
config
+               kvId = new ValueStateDescriptor<>("id", pojoType);
+               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+
+               // update to test state backends that eagerly serialize, such 
as RocksDB
+               state.update(new TestPojo("u1", 11));
+
+               KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+                               682375462378L,
+                               2,
+                               streamFactory,
+                               CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ========= restore snapshot - should use default serializer 
(FAIL ON DESERIALIZATION) =========
+
+               // cast because our test serializer is not typed to TestPojo
+               
env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
+
+               // on the second restore, since the custom serializer will be 
used for
+               // deserialization, we expect the deliberate failure to be 
thrown
+               expectedException.expect(ExpectedKryoTestException.class);
+
+               // state backends that eagerly deserializes (such as the memory 
state backend) will fail here
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2, env);
+
+               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+               // state backends that lazily deserializes (such as RocksDB) 
will fail here
+               state.value();
+       }
+
+       /**
+        * Verify state restore resilience when:
+        *  - snapshot was taken without any Kryo registrations, specific 
serializers or default serializers for the state type
+        *  - restored with a specific serializer for the state type
+        *
+        * <p> The specific serializer used on restore is {@link 
CustomKryoTestSerializer}, which deliberately
+        * fails only on deserialization. We use the deliberate deserialization 
failure to acknowledge test success.
+        *
+        * @throws Exception expects {@link ExpectedKryoTestException} to be 
thrown.
+        */
+       @Test
+       public void 
testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception 
{
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // ============== create snapshot - no Kryo registration or 
specific / default serializers ==============
+
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.update(new TestPojo("u1", 1));
+
+               backend.setCurrentKey(2);
+               state.update(new TestPojo("u2", 2));
+
+               KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+                               682375462378L,
+                               2,
+                               streamFactory,
+                               CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ========== restore snapshot - should use specific serializer 
(ONLY SERIALIZATION) ==========
+
+               
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
+
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+
+               snapshot.discardState();
+
+               // re-initialize to ensure that we create the KryoSerializer 
from scratch, otherwise
+               // initializeSerializerUnlessSet would not pick up our new 
config
+               kvId = new ValueStateDescriptor<>("id", pojoType);
+               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+
+               // update to test state backends that eagerly serialize, such 
as RocksDB
+               state.update(new TestPojo("u1", 11));
+
+               KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+                               682375462378L,
+                               2,
+                               streamFactory,
+                               CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ========= restore snapshot - should use specific serializer 
(FAIL ON DESERIALIZATION) =========
+
+               
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
+
+               // on the second restore, since the custom serializer will be 
used for
+               // deserialization, we expect the deliberate failure to be 
thrown
+               expectedException.expect(ExpectedKryoTestException.class);
+
+               // state backends that eagerly deserializes (such as the memory 
state backend) will fail here
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2, env);
+
+               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+               // state backends that lazily deserializes (such as RocksDB) 
will fail here
+               state.value();
+       }
+
+
+       @Test
+       @SuppressWarnings("unchecked")
        public void testValueState() throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
                TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
-               TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
                ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                @SuppressWarnings("unchecked")
                InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
+               // this is only available after the backend initialized the 
serializer
+               TypeSerializer<String> valueSerializer = kvId.getSerializer();
+               
                // some modifications to the state
                backend.setCurrentKey(1);
                assertNull(state.value());
@@ -276,16 +746,17 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                final ValueStateDescriptor<String> kvId =
                        new ValueStateDescriptor<>("id", String.class);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                final TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
                final TypeSerializer<Integer> namespaceSerializer =
                        IntSerializer.INSTANCE;
-               final TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
 
                final ValueState<String> state = backend
                        .getPartitionedState(namespace, IntSerializer.INSTANCE, 
kvId);
 
+               // this is only available after the backend initialized the 
serializer
+               final TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
+
                @SuppressWarnings("unchecked")
                final InternalKvState<Integer> kvState = 
(InternalKvState<Integer>) state;
 
@@ -390,9 +861,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                ValueStateDescriptor<String> desc1 = new 
ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE);
                ValueStateDescriptor<Integer> desc2 = new 
ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE);
 
-               desc1.initializeSerializerUnlessSet(new ExecutionConfig());
-               desc2.initializeSerializerUnlessSet(new ExecutionConfig());
-
                ValueState<String> state1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc1);
                ValueState<Integer> state2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc2);
 
@@ -459,7 +927,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<Long> kvId = new 
ValueStateDescriptor<>("id", LongSerializer.INSTANCE, 42L);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ValueState<Long> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -499,463 +966,443 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
        @Test
        @SuppressWarnings("unchecked,rawtypes")
-       public void testListState() {
-               try {
-                       CheckpointStreamFactory streamFactory = 
createStreamFactory();
-                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+       public void testListState() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
-                       ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
-                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
 
-                       TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
-                       TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
-                       TypeSerializer<String> valueSerializer = 
kvId.getElementSerializer();
+               TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+               TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
 
-                       ListState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
+               ListState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
-                       Joiner joiner = Joiner.on(",");
-                       // some modifications to the state
-                       backend.setCurrentKey(1);
-                       assertEquals(null, state.get());
-                       assertEquals(null, getSerializedList(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       state.add("1");
-                       backend.setCurrentKey(2);
-                       assertEquals(null, state.get());
-                       assertEquals(null, getSerializedList(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       state.add("2");
-                       backend.setCurrentKey(1);
-                       assertEquals("1", joiner.join(state.get()));
-                       assertEquals("1", 
joiner.join(getSerializedList(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               // this is only available after the backend initialized the 
serializer
+               TypeSerializer<String> valueSerializer = 
kvId.getElementSerializer();
 
-                       // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               Joiner joiner = Joiner.on(",");
+               // some modifications to the state
+               backend.setCurrentKey(1);
+               assertEquals(null, state.get());
+               assertEquals(null, getSerializedList(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               state.add("1");
+               backend.setCurrentKey(2);
+               assertEquals(null, state.get());
+               assertEquals(null, getSerializedList(kvState, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               state.add("2");
+               backend.setCurrentKey(1);
+               assertEquals("1", joiner.join(state.get()));
+               assertEquals("1", joiner.join(getSerializedList(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-                       // make some more modifications
-                       backend.setCurrentKey(1);
-                       state.add("u1");
-                       backend.setCurrentKey(2);
-                       state.add("u2");
-                       backend.setCurrentKey(3);
-                       state.add("u3");
+               // draw a snapshot
+               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.add("u1");
+               backend.setCurrentKey(2);
+               state.add("u2");
+               backend.setCurrentKey(3);
+               state.add("u3");
 
-                       // validate the original state
-                       backend.setCurrentKey(1);
-                       assertEquals("1,u1", joiner.join(state.get()));
-                       assertEquals("1,u1", 
joiner.join(getSerializedList(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-                       backend.setCurrentKey(2);
-                       assertEquals("2,u2", joiner.join(state.get()));
-                       assertEquals("2,u2", 
joiner.join(getSerializedList(kvState, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-                       backend.setCurrentKey(3);
-                       assertEquals("u3", joiner.join(state.get()));
-                       assertEquals("u3", 
joiner.join(getSerializedList(kvState, 3, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               // draw another snapshot
+               KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       backend.dispose();
-                       // restore the first snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
-                       snapshot1.discardState();
+               // validate the original state
+               backend.setCurrentKey(1);
+               assertEquals("1,u1", joiner.join(state.get()));
+               assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               backend.setCurrentKey(2);
+               assertEquals("2,u2", joiner.join(state.get()));
+               assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               backend.setCurrentKey(3);
+               assertEquals("u3", joiner.join(state.get()));
+               assertEquals("u3", joiner.join(getSerializedList(kvState, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-                       ListState<String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
+               backend.dispose();
+               // restore the first snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
+               snapshot1.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("1", joiner.join(restored1.get()));
-                       assertEquals("1", 
joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-                       backend.setCurrentKey(2);
-                       assertEquals("2", joiner.join(restored1.get()));
-                       assertEquals("2", 
joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               ListState<String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
-                       backend.dispose();
-                       // restore the second snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
-                       snapshot2.discardState();
+               backend.setCurrentKey(1);
+               assertEquals("1", joiner.join(restored1.get()));
+               assertEquals("1", 
joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               backend.setCurrentKey(2);
+               assertEquals("2", joiner.join(restored1.get()));
+               assertEquals("2", 
joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-                       ListState<String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
+               backend.dispose();
+               // restore the second snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
+               snapshot2.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("1,u1", joiner.join(restored2.get()));
-                       assertEquals("1,u1", 
joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-                       backend.setCurrentKey(2);
-                       assertEquals("2,u2", joiner.join(restored2.get()));
-                       assertEquals("2,u2", 
joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-                       backend.setCurrentKey(3);
-                       assertEquals("u3", joiner.join(restored2.get()));
-                       assertEquals("u3", 
joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               ListState<String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
-                       backend.dispose();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               backend.setCurrentKey(1);
+               assertEquals("1,u1", joiner.join(restored2.get()));
+               assertEquals("1,u1", 
joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               backend.setCurrentKey(2);
+               assertEquals("2,u2", joiner.join(restored2.get()));
+               assertEquals("2,u2", 
joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+               backend.setCurrentKey(3);
+               assertEquals("u3", joiner.join(restored2.get()));
+               assertEquals("u3", 
joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
+               backend.dispose();
        }
 
        @Test
        @SuppressWarnings("unchecked")
-       public void testReducingState() {
-               try {
-                       CheckpointStreamFactory streamFactory = 
createStreamFactory();
-                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+       public void testReducingState() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
-                       ReducingStateDescriptor<String> kvId = new 
ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
-                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               ReducingStateDescriptor<String> kvId = new 
ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
 
-                       TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
-                       TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
-                       TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
+               TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+               TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
 
-                       ReducingState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
+               ReducingState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
-                       // some modifications to the state
-                       backend.setCurrentKey(1);
-                       assertEquals(null, state.get());
-                       assertNull(getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       state.add("1");
-                       backend.setCurrentKey(2);
-                       assertEquals(null, state.get());
-                       assertNull(getSerializedValue(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       state.add("2");
-                       backend.setCurrentKey(1);
-                       assertEquals("1", state.get());
-                       assertEquals("1", getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               // this is only available after the backend initialized the 
serializer
+               TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
-                       // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               // some modifications to the state
+               backend.setCurrentKey(1);
+               assertEquals(null, state.get());
+               assertNull(getSerializedValue(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               state.add("1");
+               backend.setCurrentKey(2);
+               assertEquals(null, state.get());
+               assertNull(getSerializedValue(kvState, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               state.add("2");
+               backend.setCurrentKey(1);
+               assertEquals("1", state.get());
+               assertEquals("1", getSerializedValue(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-                       // make some more modifications
-                       backend.setCurrentKey(1);
-                       state.add("u1");
-                       backend.setCurrentKey(2);
-                       state.add("u2");
-                       backend.setCurrentKey(3);
-                       state.add("u3");
+               // draw a snapshot
+               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.add("u1");
+               backend.setCurrentKey(2);
+               state.add("u2");
+               backend.setCurrentKey(3);
+               state.add("u3");
 
-                       // validate the original state
-                       backend.setCurrentKey(1);
-                       assertEquals("1,u1", state.get());
-                       assertEquals("1,u1", getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("2,u2", state.get());
-                       assertEquals("2,u2", getSerializedValue(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(3);
-                       assertEquals("u3", state.get());
-                       assertEquals("u3", getSerializedValue(kvState, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               // draw another snapshot
+               KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       backend.dispose();
-                       // restore the first snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
-                       snapshot1.discardState();
+               // validate the original state
+               backend.setCurrentKey(1);
+               assertEquals("1,u1", state.get());
+               assertEquals("1,u1", getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("2,u2", state.get());
+               assertEquals("2,u2", getSerializedValue(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(3);
+               assertEquals("u3", state.get());
+               assertEquals("u3", getSerializedValue(kvState, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-                       ReducingState<String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
+               backend.dispose();
+               // restore the first snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
+               snapshot1.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("1", restored1.get());
-                       assertEquals("1", getSerializedValue(restoredKvState1, 
1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, 
valueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("2", restored1.get());
-                       assertEquals("2", getSerializedValue(restoredKvState1, 
2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, 
valueSerializer));
+               ReducingState<String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
-                       backend.dispose();
-                       // restore the second snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
-                       snapshot2.discardState();
+               backend.setCurrentKey(1);
+               assertEquals("1", restored1.get());
+               assertEquals("1", getSerializedValue(restoredKvState1, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("2", restored1.get());
+               assertEquals("2", getSerializedValue(restoredKvState1, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-                       ReducingState<String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
+               backend.dispose();
+               // restore the second snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
+               snapshot2.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("1,u1", restored2.get());
-                       assertEquals("1,u1", 
getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("2,u2", restored2.get());
-                       assertEquals("2,u2", 
getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(3);
-                       assertEquals("u3", restored2.get());
-                       assertEquals("u3", getSerializedValue(restoredKvState2, 
3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, 
valueSerializer));
+               ReducingState<String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
-                       backend.dispose();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               backend.setCurrentKey(1);
+               assertEquals("1,u1", restored2.get());
+               assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("2,u2", restored2.get());
+               assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(3);
+               assertEquals("u3", restored2.get());
+               assertEquals("u3", getSerializedValue(restoredKvState2, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+
+               backend.dispose();
        }
 
        @Test
        @SuppressWarnings("unchecked,rawtypes")
-       public void testFoldingState() {
-               try {
-                       CheckpointStreamFactory streamFactory = 
createStreamFactory();
-                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+       public void testFoldingState() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
-                       FoldingStateDescriptor<Integer, String> kvId = new 
FoldingStateDescriptor<>("id",
-                                       "Fold-Initial:",
-                                       new AppendingFold(),
-                                       String.class);
-                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               FoldingStateDescriptor<Integer, String> kvId = new 
FoldingStateDescriptor<>("id",
+                               "Fold-Initial:",
+                               new AppendingFold(),
+                               String.class);
 
-                       TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
-                       TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
-                       TypeSerializer<String> valueSerializer = 
kvId.getSerializer();
+               TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+               TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
 
-                       FoldingState<Integer, String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
+               FoldingState<Integer, String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
-                       // some modifications to the state
-                       backend.setCurrentKey(1);
-                       assertEquals(null, state.get());
-                       assertEquals(null, getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       state.add(1);
-                       backend.setCurrentKey(2);
-                       assertEquals(null, state.get());
-                       assertEquals(null, getSerializedValue(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-                       state.add(2);
-                       backend.setCurrentKey(1);
-                       assertEquals("Fold-Initial:,1", state.get());
-                       assertEquals("Fold-Initial:,1", 
getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+               // this is only available after the backend initialized the 
serializer
+               TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
-                       // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               // some modifications to the state
+               backend.setCurrentKey(1);
+               assertEquals(null, state.get());
+               assertEquals(null, getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               state.add(1);
+               backend.setCurrentKey(2);
+               assertEquals(null, state.get());
+               assertEquals(null, getSerializedValue(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+               state.add(2);
+               backend.setCurrentKey(1);
+               assertEquals("Fold-Initial:,1", state.get());
+               assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-                       // make some more modifications
-                       backend.setCurrentKey(1);
-                       state.clear();
-                       state.add(101);
-                       backend.setCurrentKey(2);
-                       state.add(102);
-                       backend.setCurrentKey(3);
-                       state.add(103);
+               // draw a snapshot
+               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.clear();
+               state.add(101);
+               backend.setCurrentKey(2);
+               state.add(102);
+               backend.setCurrentKey(3);
+               state.add(103);
 
-                       // validate the original state
-                       backend.setCurrentKey(1);
-                       assertEquals("Fold-Initial:,101", state.get());
-                       assertEquals("Fold-Initial:,101", 
getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("Fold-Initial:,2,102", state.get());
-                       assertEquals("Fold-Initial:,2,102", 
getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(3);
-                       assertEquals("Fold-Initial:,103", state.get());
-                       assertEquals("Fold-Initial:,103", 
getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+               // draw another snapshot
+               KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       backend.dispose();
-                       // restore the first snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
-                       snapshot1.discardState();
+               // validate the original state
+               backend.setCurrentKey(1);
+               assertEquals("Fold-Initial:,101", state.get());
+               assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 
1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, 
valueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("Fold-Initial:,2,102", state.get());
+               assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 
2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, 
valueSerializer));
+               backend.setCurrentKey(3);
+               assertEquals("Fold-Initial:,103", state.get());
+               assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 
3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, 
valueSerializer));
 
-                       FoldingState<Integer, String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
+               backend.dispose();
+               // restore the first snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
+               snapshot1.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("Fold-Initial:,1", restored1.get());
-                       assertEquals("Fold-Initial:,1", 
getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("Fold-Initial:,2", restored1.get());
-                       assertEquals("Fold-Initial:,2", 
getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+               FoldingState<Integer, String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
-                       backend.dispose();
-                       // restore the second snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
-                       snapshot1.discardState();
+               backend.setCurrentKey(1);
+               assertEquals("Fold-Initial:,1", restored1.get());
+               assertEquals("Fold-Initial:,1", 
getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("Fold-Initial:,2", restored1.get());
+               assertEquals("Fold-Initial:,2", 
getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
 
-                       @SuppressWarnings("unchecked")
-                       FoldingState<Integer, String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
+               backend.dispose();
+               // restore the second snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
+               snapshot1.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("Fold-Initial:,101", restored2.get());
-                       assertEquals("Fold-Initial:,101", 
getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("Fold-Initial:,2,102", restored2.get());
-                       assertEquals("Fold-Initial:,2,102", 
getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
-                       backend.setCurrentKey(3);
-                       assertEquals("Fold-Initial:,103", restored2.get());
-                       assertEquals("Fold-Initial:,103", 
getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+               @SuppressWarnings("unchecked")
+               FoldingState<Integer, String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
-                       backend.dispose();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               backend.setCurrentKey(1);
+               assertEquals("Fold-Initial:,101", restored2.get());
+               assertEquals("Fold-Initial:,101", 
getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("Fold-Initial:,2,102", restored2.get());
+               assertEquals("Fold-Initial:,2,102", 
getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+               backend.setCurrentKey(3);
+               assertEquals("Fold-Initial:,103", restored2.get());
+               assertEquals("Fold-Initial:,103", 
getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
+
+               backend.dispose();
        }
 
        @Test
        @SuppressWarnings("unchecked,rawtypes")
-       public void testMapState() {
-               try {
-                       CheckpointStreamFactory streamFactory = 
createStreamFactory();
-                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+       public void testMapState() throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
-                       MapStateDescriptor<Integer, String> kvId = new 
MapStateDescriptor<>("id", Integer.class, String.class);
-                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               MapStateDescriptor<Integer, String> kvId = new 
MapStateDescriptor<>("id", Integer.class, String.class);
 
-                       TypeSerializer<Integer> keySerializer = 
IntSerializer.INSTANCE;
-                       TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
-                       TypeSerializer<Integer> userKeySerializer = 
kvId.getKeySerializer();
-                       TypeSerializer<String> userValueSerializer = 
kvId.getValueSerializer();
+               TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+               TypeSerializer<VoidNamespace> namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
 
-                       MapState<Integer, String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
+               MapState<Integer, String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
-                       // some modifications to the state
-                       backend.setCurrentKey(1);
-                       assertEquals(null, state.get(1));
-                       assertEquals(null, getSerializedMap(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-                       state.put(1, "1");
-                       backend.setCurrentKey(2);
-                       assertEquals(null, state.get(2));
-                       assertEquals(null, getSerializedMap(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-                       state.put(2, "2");
-                       backend.setCurrentKey(1);
-                       assertTrue(state.contains(1));
-                       assertEquals("1", state.get(1));
-                       assertEquals(new HashMap<Integer, String>() {{ put (1, 
"1"); }},
-                                       getSerializedMap(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               // these are only available after the backend initialized the 
serializer
+               TypeSerializer<Integer> userKeySerializer = 
kvId.getKeySerializer();
+               TypeSerializer<String> userValueSerializer = 
kvId.getValueSerializer();
 
-                       // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               // some modifications to the state
+               backend.setCurrentKey(1);
+               assertEquals(null, state.get(1));
+               assertEquals(null, getSerializedMap(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               state.put(1, "1");
+               backend.setCurrentKey(2);
+               assertEquals(null, state.get(2));
+               assertEquals(null, getSerializedMap(kvState, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               state.put(2, "2");
+               backend.setCurrentKey(1);
+               assertTrue(state.contains(1));
+               assertEquals("1", state.get(1));
+               assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+                               getSerializedMap(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
 
-                       // make some more modifications
-                       backend.setCurrentKey(1);
-                       state.put(1, "101");
-                       backend.setCurrentKey(2);
-                       state.put(102, "102");
-                       backend.setCurrentKey(3);
-                       state.put(103, "103");
-                       state.putAll(new HashMap<Integer, String>() {{ 
put(1031, "1031"); put(1032, "1032"); }});
+               // draw a snapshot
+               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.put(1, "101");
+               backend.setCurrentKey(2);
+               state.put(102, "102");
+               backend.setCurrentKey(3);
+               state.put(103, "103");
+               state.putAll(new HashMap<Integer, String>() {{ put(1031, 
"1031"); put(1032, "1032"); }});
 
-                       // validate the original state
-                       backend.setCurrentKey(1);
-                       assertEquals("101", state.get(1));
-                       assertEquals(new HashMap<Integer, String>() {{ put(1, 
"101"); }},
-                                       getSerializedMap(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("102", state.get(102));
-                       assertEquals(new HashMap<Integer, String>() {{ put(2, 
"2"); put(102, "102"); }},
-                                       getSerializedMap(kvState, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-                       backend.setCurrentKey(3);
-                       assertTrue(state.contains(103));
-                       assertEquals("103", state.get(103));
-                       assertEquals(new HashMap<Integer, String>() {{ put(103, 
"103"); put(1031, "1031"); put(1032, "1032"); }},
-                                       getSerializedMap(kvState, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-
-                       List<Integer> keys = new ArrayList<>();
-                       for (Integer key : state.keys()) {
-                               keys.add(key);
-                       }
-                       List<Integer> expectedKeys = new ArrayList<Integer>() 
{{ add(103); add(1031); add(1032); }};
-                       assertEquals(keys.size(), expectedKeys.size());
-                       keys.removeAll(expectedKeys);
-                       assertTrue(keys.isEmpty());
-
-                       List<String> values = new ArrayList<>();
-                       for (String value : state.values()) {
-                               values.add(value);
-                       }
-                       List<String> expectedValues = new ArrayList<String>() 
{{ add("103"); add("1031"); add("1032"); }};
-                       assertEquals(values.size(), expectedValues.size());
-                       values.removeAll(expectedValues);
-                       assertTrue(values.isEmpty());
+               // draw another snapshot
+               KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
 
-                       // make some more modifications
-                       backend.setCurrentKey(1);
-                       state.clear();
-                       backend.setCurrentKey(2);
-                       state.remove(102);
-                       backend.setCurrentKey(3);
-                       final String updateSuffix = "_updated";
-                       Iterator<Map.Entry<Integer, String>> iterator = 
state.iterator();
-                       while (iterator.hasNext()) {
-                               Map.Entry<Integer, String> entry = 
iterator.next();
-                               if (entry.getValue().length() != 4) {
-                                       iterator.remove();
-                               } else {
-                                       entry.setValue(entry.getValue() + 
updateSuffix);
-                               }
-                       }
+               // validate the original state
+               backend.setCurrentKey(1);
+               assertEquals("101", state.get(1));
+               assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); 
}},
+                               getSerializedMap(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("102", state.get(102));
+               assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); 
put(102, "102"); }},
+                               getSerializedMap(kvState, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               backend.setCurrentKey(3);
+               assertTrue(state.contains(103));
+               assertEquals("103", state.get(103));
+               assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); 
put(1031, "1031"); put(1032, "1032"); }},
+                               getSerializedMap(kvState, 3, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+
+               List<Integer> keys = new ArrayList<>();
+               for (Integer key : state.keys()) {
+                       keys.add(key);
+               }
+               List<Integer> expectedKeys = new ArrayList<Integer>() {{ 
add(103); add(1031); add(1032); }};
+               assertEquals(keys.size(), expectedKeys.size());
+               keys.removeAll(expectedKeys);
+               assertTrue(keys.isEmpty());
+
+               List<String> values = new ArrayList<>();
+               for (String value : state.values()) {
+                       values.add(value);
+               }
+               List<String> expectedValues = new ArrayList<String>() {{ 
add("103"); add("1031"); add("1032"); }};
+               assertEquals(values.size(), expectedValues.size());
+               values.removeAll(expectedValues);
+               assertTrue(values.isEmpty());
 
-                       // validate the state
-                       backend.setCurrentKey(1);
-                       backend.setCurrentKey(2);
-                       assertFalse(state.contains(102));
-                       backend.setCurrentKey(3);
-                       for (Map.Entry<Integer, String> entry : 
state.entries()) {
-                               assertEquals(4 + updateSuffix.length(), 
entry.getValue().length());
-                               
assertTrue(entry.getValue().endsWith(updateSuffix));
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.clear();
+               backend.setCurrentKey(2);
+               state.remove(102);
+               backend.setCurrentKey(3);
+               final String updateSuffix = "_updated";
+               Iterator<Map.Entry<Integer, String>> iterator = 
state.iterator();
+               while (iterator.hasNext()) {
+                       Map.Entry<Integer, String> entry = iterator.next();
+                       if (entry.getValue().length() != 4) {
+                               iterator.remove();
+                       } else {
+                               entry.setValue(entry.getValue() + updateSuffix);
                        }
+               }
 
-                       backend.dispose();
-                       // restore the first snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
-                       snapshot1.discardState();
+               // validate the state
+               backend.setCurrentKey(1);
+               backend.setCurrentKey(2);
+               assertFalse(state.contains(102));
+               backend.setCurrentKey(3);
+               for (Map.Entry<Integer, String> entry : state.entries()) {
+                       assertEquals(4 + updateSuffix.length(), 
entry.getValue().length());
+                       assertTrue(entry.getValue().endsWith(updateSuffix));
+               }
 
-                       MapState<Integer, String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
+               backend.dispose();
+               // restore the first snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
+               snapshot1.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("1", restored1.get(1));
-                       assertEquals(new HashMap<Integer, String>() {{ put (1, 
"1"); }},
-                                       getSerializedMap(restoredKvState1, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("2", restored1.get(2));
-                       assertEquals(new HashMap<Integer, String>() {{ put (2, 
"2"); }},
-                                       getSerializedMap(restoredKvState1, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               MapState<Integer, String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
-                       backend.dispose();
-                       // restore the second snapshot and validate it
-                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
-                       snapshot2.discardState();
+               backend.setCurrentKey(1);
+               assertEquals("1", restored1.get(1));
+               assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+                               getSerializedMap(restoredKvState1, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("2", restored1.get(2));
+               assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
+                               getSerializedMap(restoredKvState1, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
 
-                       @SuppressWarnings("unchecked")
-                       MapState<Integer, String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-                       @SuppressWarnings("unchecked")
-                       InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
+               backend.dispose();
+               // restore the second snapshot and validate it
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2);
+               snapshot2.discardState();
 
-                       backend.setCurrentKey(1);
-                       assertEquals("101", restored2.get(1));
-                       assertEquals(new HashMap<Integer, String>() {{ put (1, 
"101"); }},
-                                       getSerializedMap(restoredKvState2, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-                       backend.setCurrentKey(2);
-                       assertEquals("102", restored2.get(102));
-                       assertEquals(new HashMap<Integer, String>() {{ put(2, 
"2"); put (102, "102"); }},
-                                       getSerializedMap(restoredKvState2, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
-                       backend.setCurrentKey(3);
-                       assertEquals("103", restored2.get(103));
-                       assertEquals(new HashMap<Integer, String>() {{ put(103, 
"103"); put(1031, "1031"); put(1032, "1032"); }},
-                                       getSerializedMap(restoredKvState2, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               @SuppressWarnings("unchecked")
+               MapState<Integer, String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+               @SuppressWarnings("unchecked")
+               InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
-                       backend.dispose();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               backend.setCurrentKey(1);
+               assertEquals("101", restored2.get(1));
+               assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); 
}},
+                               getSerializedMap(restoredKvState2, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               backend.setCurrentKey(2);
+               assertEquals("102", restored2.get(102));
+               assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put 
(102, "102"); }},
+                               getSerializedMap(restoredKvState2, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
+               backend.setCurrentKey(3);
+               assertEquals("103", restored2.get(103));
+               assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); 
put(1031, "1031"); put(1032, "1032"); }},
+                               getSerializedMap(restoredKvState2, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, 
userValueSerializer));
 
+               backend.dispose();
        }
 
        /**
@@ -966,7 +1413,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -991,7 +1437,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, "Hello");
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1015,7 +1460,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ReducingStateDescriptor<String> kvId = new 
ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ReducingState<String> state = backend.getPartitionedState(
                                VoidNamespace.INSTANCE,
@@ -1043,8 +1487,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                FoldingStateDescriptor<Integer, String> kvId =
                                new FoldingStateDescriptor<>("id", 
"Fold-Initial:", new AppendingFold(), String.class);
 
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
                FoldingState<Integer, String> state = 
backend.getPartitionedState(
                                VoidNamespace.INSTANCE,
                                VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1071,7 +1513,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ListState<String> state = backend.getPartitionedState(
                                VoidNamespace.INSTANCE,
@@ -1098,7 +1539,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                MapStateDescriptor<String, String> kvId = new 
MapStateDescriptor<>("id", String.class, String.class);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                MapState<String, String> state = backend.getPartitionedState(
                                VoidNamespace.INSTANCE,
@@ -1142,7 +1582,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                new DummyEnvironment("test", 1, 0));
 
                ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1224,7 +1663,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                        ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
-                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                        ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1430,7 +1868,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<IntValue> kvId = new 
ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ValueState<IntValue> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1458,7 +1895,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<IntValue> kvId = new 
ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                try {
                        backend.getPartitionedState(null, 
VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1501,7 +1937,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                        Integer.class,
                                        -1);
                        desc.setQueryable("my-query");
-                       desc.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                        ValueState<Integer> state = backend.getPartitionedState(
                                        VoidNamespace.INSTANCE,
@@ -1524,7 +1959,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        // ListState
                        ListStateDescriptor<Integer> desc = new 
ListStateDescriptor<>("list-state", Integer.class);
                        desc.setQueryable("my-query");
-                       desc.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                        ListState<Integer> state = backend.getPartitionedState(
                                        VoidNamespace.INSTANCE,
@@ -1552,7 +1986,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                }
                        }, Integer.class);
                        desc.setQueryable("my-query");
-                       desc.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                        ReducingState<Integer> state = 
backend.getPartitionedState(
                                        VoidNamespace.INSTANCE,
@@ -1580,7 +2013,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                }
                        }, Integer.class);
                        desc.setQueryable("my-query");
-                       desc.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                        FoldingState<Integer, Integer> state = 
backend.getPartitionedState(
                                        VoidNamespace.INSTANCE,
@@ -1602,7 +2034,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        // MapState
                        MapStateDescriptor<Integer, String> desc = new 
MapStateDescriptor<>("map-state", Integer.class, String.class);
                        desc.setQueryable("my-query");
-                       desc.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                        MapState<Integer, String> state = 
backend.getPartitionedState(
                                        VoidNamespace.INSTANCE,
@@ -1935,4 +2366,107 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        return 
KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, 
userValueSerializer);
                }
        }
+
+       private KeyGroupsStateHandle 
runSnapshot(RunnableFuture<KeyGroupsStateHandle> snapshotRunnableFuture) throws 
Exception {
+               if(!snapshotRunnableFuture.isDone()) {
+                       Thread runner = new Thread(snapshotRunnableFuture);
+                       runner.start();
+               }
+               return snapshotRunnableFuture.get();
+       }
+
+       private static class TestPojo implements Serializable {
+               private String strField;
+               private Integer intField;
+
+               public TestPojo() {}
+
+               public TestPojo(String strField, Integer intField) {
+                       this.strField = strField;
+                       this.intField = intField;
+               }
+
+               public String getStrField() {
+                       return strField;
+               }
+
+               public void setStrField(String strField) {
+                       this.strField = strField;
+               }
+
+               public Integer getIntField() {
+                       return intField;
+               }
+
+               public void setIntField(Integer intField) {
+                       this.intField = intField;
+               }
+
+               @Override
+               public String toString() {
+                       return "TestPojo{" +
+                                       "strField='" + strField + '\'' +
+                                       ", intField=" + intField +
+                                       '}';
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) return true;
+                       if (o == null || getClass() != o.getClass()) return 
false;
+
+                       TestPojo testPojo = (TestPojo) o;
+
+                       if (!strField.equals(testPojo.strField)) return false;
+                       return intField.equals(testPojo.intField);
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = strField.hashCode();
+                       result = 31 * result + intField.hashCode();
+                       return result;
+               }
+       }
+
+       /**
+        * We throw this in our {@link ExceptionThrowingTestSerializer}.
+        */
+       private static class ExpectedKryoTestException extends RuntimeException 
{}
+
+       /**
+        * Kryo {@code Serializer} that throws an expected exception. We use 
this to ensure
+        * that the state backend correctly uses a specified Kryo serializer.
+        */
+       public static class ExceptionThrowingTestSerializer extends 
JavaSerializer {
+               @Override
+               public void write(Kryo kryo, Output output, Object object) {
+                       throw new ExpectedKryoTestException();
+               }
+
+               @Override
+               public Object read(Kryo kryo, Input input, Class type) {
+                       throw new ExpectedKryoTestException();
+               }
+       }
+
+       /**
+        * Our custom version of {@link JavaSerializer} for checking whether 
restore with a registered
+        * serializer works when no serializer was previously registered.
+        *
+        * <p>This {@code Serializer} can only be used for writing, not for 
reading. With this we
+        * verify that state that was serialized without a registered {@code 
Serializer} is in fact
+        * not restored with a {@code Serializer} that was later registered.
+        */
+       public static class CustomKryoTestSerializer extends JavaSerializer {
+               @Override
+               public void write(Kryo kryo, Output output, Object object) {
+                       super.write(kryo, output, object);
+               }
+
+               @Override
+               public Object read(Kryo kryo, Input input, Class type) {
+                       throw new ExpectedKryoTestException();
+               }
+       }
 }

Reply via email to