[FLINK-5995] [checkpoints] Harden test for state descriptor passing to OperatorState
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f700caf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f700caf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f700caf Branch: refs/heads/master Commit: 3f700cafbb21c98a94c2ad21b90c1823963fed29 Parents: 614abd2 Author: Stephan Ewen <[email protected]> Authored: Fri Mar 17 13:52:36 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Mar 20 13:20:03 2017 +0100 ---------------------------------------------------------------------- .../runtime/state/OperatorStateBackendTest.java | 89 ++++++++++++++------ 1 file changed, 63 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3f700caf/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 157d5ee..bc446f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -21,21 +21,27 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; + import org.apache.flink.util.FutureUtil; -import org.junit.Assert; import org.junit.Test; import java.io.Serializable; import java.io.File; import java.util.Collections; import java.util.Iterator; +import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -43,54 +49,67 @@ import static org.mockito.Mockito.when; public class OperatorStateBackendTest { - AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); - - static Environment createMockEnvironment() { - Environment env = mock(Environment.class); - ExecutionConfig config = mock(ExecutionConfig.class); - when(env.getExecutionConfig()).thenReturn(config); - when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader()); - return env; - } - - private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception { - //TODO this is temporarily casted to test already functionality that we do not yet expose through public API - return (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend( - createMockEnvironment(), - "test-operator"); - } + private final ClassLoader classLoader = getClass().getClassLoader(); @Test - public void testCreateNew() throws Exception { - OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + public void testCreateOnAbstractStateBackend() throws Exception { + // we use the memory state backend as a subclass of the AbstractStateBackend + final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(); + OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend( + createMockEnvironment(), "test-operator"); + assertNotNull(operatorStateBackend); assertTrue(operatorStateBackend.getRegisteredStateNames().isEmpty()); } @Test public void testRegisterStatesWithoutTypeSerializer() throws Exception { - DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + // prepare an execution config with a non standard type registered + final Class<?> registeredType = FutureTask.class; + + // validate the precondition of this test - if this condition fails, we need to pick a different + // example serializer + assertFalse(new KryoSerializer<>(File.class, new ExecutionConfig()).getKryo().getDefaultSerializer(registeredType) + instanceof com.esotericsoftware.kryo.serializers.JavaSerializer); + + final ExecutionConfig cfg = new ExecutionConfig(); + cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class); + + final DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg); + ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class); ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class); + ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor); assertNotNull(listState); + ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); assertNotNull(listState2); + assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); + + // make sure that type registrations are forwarded + TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getPartitionStateSerializer(); + assertTrue(serializer instanceof KryoSerializer); + assertTrue(((KryoSerializer<?>) serializer).getKryo().getSerializer(registeredType) + instanceof com.esotericsoftware.kryo.serializers.JavaSerializer); + Iterator<String> it = listState2.get().iterator(); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); listState2.add("kevin"); listState2.add("sunny"); it = listState2.get().iterator(); assertEquals("kevin", it.next()); assertEquals("sunny", it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); } @Test public void testRegisterStates() throws Exception { - DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + final DefaultOperatorStateBackend operatorStateBackend = + new DefaultOperatorStateBackend(classLoader, new ExecutionConfig()); + ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); @@ -173,7 +192,11 @@ public class OperatorStateBackendTest { @Test public void testSnapshotEmpty() throws Exception { - DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); + + final DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend) + abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator"); + CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); @@ -181,12 +204,16 @@ public class OperatorStateBackendTest { operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); - Assert.assertNull(stateHandle); + assertNull(stateHandle); } @Test public void testSnapshotRestore() throws Exception { - DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); + + DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend) + abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name"); + ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); @@ -255,4 +282,14 @@ public class OperatorStateBackendTest { } } + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private static Environment createMockEnvironment() { + Environment env = mock(Environment.class); + when(env.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader()); + return env; + } }
