Repository: flink Updated Branches: refs/heads/master fe6b83585 -> 60a4ab32e
[FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value Behaviour Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60a4ab32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60a4ab32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60a4ab32 Branch: refs/heads/master Commit: 60a4ab32e1662310da4633a97e02dca62431952e Parents: fe6b835 Author: Aljoscha Krettek <[email protected]> Authored: Wed Nov 23 12:13:05 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Nov 28 17:59:02 2016 +0100 ---------------------------------------------------------------------- .../runtime/state/StateBackendTestBase.java | 143 ++++++++++++++++++- 1 file changed, 137 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60a4ab32/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 9e835ce..0a3a092 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -56,12 +56,8 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RunnableFuture; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -659,6 +655,141 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { } /** + * Verify that {@link ValueStateDescriptor} allows {@code null} as default. + */ + @Test + public void testValueStateNullAsDefaultValue() throws Exception { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertEquals(null, state.value()); + + state.update("Ciao"); + assertEquals("Ciao", state.value()); + + state.clear(); + assertEquals(null, state.value()); + + backend.dispose(); + } + + + /** + * Verify that an empty {@code ValueState} will yield the default value. + */ + @Test + public void testValueStateDefaultValue() throws Exception { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, "Hello"); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertEquals("Hello", state.value()); + + state.update("Ciao"); + assertEquals("Ciao", state.value()); + + state.clear(); + assertEquals("Hello", state.value()); + + backend.dispose(); + } + + /** + * Verify that an empty {@code ReduceState} yields {@code null}. + */ + @Test + public void testReducingStateDefaultValue() throws Exception { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ReducingState<String> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertNull(state.get()); + + state.add("Ciao"); + assertEquals("Ciao", state.get()); + + state.clear(); + assertNull(state.get()); + + backend.dispose(); + } + + /** + * Verify that an empty {@code FoldingState} yields {@code null}. + */ + @Test + public void testFoldingStateDefaultValue() throws Exception { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + FoldingStateDescriptor<Integer, String> kvId = + new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class); + + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + FoldingState<Integer, String> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertNull(state.get()); + + state.add(1); + state.add(2); + assertEquals("Fold-Initial:,1,2", state.get()); + + state.clear(); + assertNull(state.get()); + + backend.dispose(); + } + + + /** + * Verify that an empty {@code ListState} yields {@code null}. + */ + @Test + public void testListStateDefaultValue() throws Exception { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ListState<String> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertNull(state.get()); + + state.add("Ciao"); + state.add("Bello"); + assertThat(state.get(), containsInAnyOrder("Ciao", "Bello")); + + state.clear(); + assertNull(state.get()); + + backend.dispose(); + } + + + + + /** * This test verifies that state is correctly assigned to key groups and that restore * restores the relevant key groups in the backend. *
