Repository: flink
Updated Branches:
  refs/heads/master fe6b83585 -> 60a4ab32e


[FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value 
Behaviour


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60a4ab32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60a4ab32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60a4ab32

Branch: refs/heads/master
Commit: 60a4ab32e1662310da4633a97e02dca62431952e
Parents: fe6b835
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Nov 23 12:13:05 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Nov 28 17:59:02 2016 +0100

----------------------------------------------------------------------
 .../runtime/state/StateBackendTestBase.java     | 143 ++++++++++++++++++-
 1 file changed, 137 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60a4ab32/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 9e835ce..0a3a092 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -56,12 +56,8 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RunnableFuture;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -659,6 +655,141 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
        }
 
        /**
+        * Verify that {@link ValueStateDescriptor} allows {@code null} as 
default.
+        */
+       @Test
+       public void testValueStateNullAsDefaultValue() throws Exception {
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+               assertEquals(null, state.value());
+
+               state.update("Ciao");
+               assertEquals("Ciao", state.value());
+
+               state.clear();
+               assertEquals(null, state.value());
+
+               backend.dispose();
+       }
+
+
+       /**
+        * Verify that an empty {@code ValueState} will yield the default value.
+        */
+       @Test
+       public void testValueStateDefaultValue() throws Exception {
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, "Hello");
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+               assertEquals("Hello", state.value());
+
+               state.update("Ciao");
+               assertEquals("Ciao", state.value());
+
+               state.clear();
+               assertEquals("Hello", state.value());
+
+               backend.dispose();
+       }
+
+       /**
+        * Verify that an empty {@code ReduceState} yields {@code null}.
+        */
+       @Test
+       public void testReducingStateDefaultValue() throws Exception {
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               ReducingStateDescriptor<String> kvId = new 
ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               ReducingState<String> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+               assertNull(state.get());
+
+               state.add("Ciao");
+               assertEquals("Ciao", state.get());
+
+               state.clear();
+               assertNull(state.get());
+
+               backend.dispose();
+       }
+
+       /**
+        * Verify that an empty {@code FoldingState} yields {@code null}.
+        */
+       @Test
+       public void testFoldingStateDefaultValue() throws Exception {
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               FoldingStateDescriptor<Integer, String> kvId =
+                               new FoldingStateDescriptor<>("id", 
"Fold-Initial:", new AppendingFold(), String.class);
+
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               FoldingState<Integer, String> state = 
backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+               assertNull(state.get());
+
+               state.add(1);
+               state.add(2);
+               assertEquals("Fold-Initial:,1,2", state.get());
+
+               state.clear();
+               assertNull(state.get());
+
+               backend.dispose();
+       }
+
+
+       /**
+        * Verify that an empty {@code ListState} yields {@code null}.
+        */
+       @Test
+       public void testListStateDefaultValue() throws Exception {
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
+               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               ListState<String> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+               assertNull(state.get());
+
+               state.add("Ciao");
+               state.add("Bello");
+               assertThat(state.get(), containsInAnyOrder("Ciao", "Bello"));
+
+               state.clear();
+               assertNull(state.get());
+
+               backend.dispose();
+       }
+
+
+
+
+       /**
         * This test verifies that state is correctly assigned to key groups 
and that restore
         * restores the relevant key groups in the backend.
         *

Reply via email to