[FLINK-5995] [checkpoints] Harden test for state descriptor passing to 
OperatorState


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f700caf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f700caf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f700caf

Branch: refs/heads/master
Commit: 3f700cafbb21c98a94c2ad21b90c1823963fed29
Parents: 614abd2
Author: Stephan Ewen <[email protected]>
Authored: Fri Mar 17 13:52:36 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Mar 20 13:20:03 2017 +0100

----------------------------------------------------------------------
 .../runtime/state/OperatorStateBackendTest.java | 89 ++++++++++++++------
 1 file changed, 63 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f700caf/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 157d5ee..bc446f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,21 +21,27 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
 import org.apache.flink.util.FutureUtil;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -43,54 +49,67 @@ import static org.mockito.Mockito.when;
 
 public class OperatorStateBackendTest {
 
-       AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
-
-       static Environment createMockEnvironment() {
-               Environment env = mock(Environment.class);
-               ExecutionConfig config = mock(ExecutionConfig.class);
-               when(env.getExecutionConfig()).thenReturn(config);
-               
when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
-               return env;
-       }
-
-       private DefaultOperatorStateBackend createNewOperatorStateBackend() 
throws Exception {
-               //TODO this is temporarily casted to test already functionality 
that we do not yet expose through public API
-               return (DefaultOperatorStateBackend) 
abstractStateBackend.createOperatorStateBackend(
-                               createMockEnvironment(),
-                               "test-operator");
-       }
+       private final ClassLoader classLoader = getClass().getClassLoader();
 
        @Test
-       public void testCreateNew() throws Exception {
-               OperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+       public void testCreateOnAbstractStateBackend() throws Exception {
+               // we use the memory state backend as a subclass of the 
AbstractStateBackend
+               final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend();
+               OperatorStateBackend operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
+                               createMockEnvironment(), "test-operator");
+
                assertNotNull(operatorStateBackend);
                
assertTrue(operatorStateBackend.getRegisteredStateNames().isEmpty());
        }
 
        @Test
        public void testRegisterStatesWithoutTypeSerializer() throws Exception {
-               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               // prepare an execution config with a non standard type 
registered
+               final Class<?> registeredType = FutureTask.class;
+
+               // validate the precondition of this test - if this condition 
fails, we need to pick a different
+               // example serializer
+               assertFalse(new KryoSerializer<>(File.class, new 
ExecutionConfig()).getKryo().getDefaultSerializer(registeredType)
+                               instanceof 
com.esotericsoftware.kryo.serializers.JavaSerializer);
+
+               final ExecutionConfig cfg = new ExecutionConfig();
+               cfg.registerTypeWithKryoSerializer(registeredType, 
com.esotericsoftware.kryo.serializers.JavaSerializer.class);
+
+               final DefaultOperatorStateBackend operatorStateBackend = new 
DefaultOperatorStateBackend(classLoader, cfg);
+
                ListStateDescriptor<File> stateDescriptor = new 
ListStateDescriptor<>("test", File.class);
                ListStateDescriptor<String> stateDescriptor2 = new 
ListStateDescriptor<>("test2", String.class);
+
                ListState<File> listState = 
operatorStateBackend.getOperatorState(stateDescriptor);
                assertNotNull(listState);
+
                ListState<String> listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
                assertNotNull(listState2);
+
                assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
+
+               // make sure that type registrations are forwarded
+               TypeSerializer<?> serializer = ((PartitionableListState<?>) 
listState).getPartitionStateSerializer();
+               assertTrue(serializer instanceof KryoSerializer);
+               assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getSerializer(registeredType)
+                               instanceof 
com.esotericsoftware.kryo.serializers.JavaSerializer);
+
                Iterator<String> it = listState2.get().iterator();
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
                listState2.add("kevin");
                listState2.add("sunny");
 
                it = listState2.get().iterator();
                assertEquals("kevin", it.next());
                assertEquals("sunny", it.next());
-               assertTrue(!it.hasNext());
+               assertFalse(it.hasNext());
        }
 
        @Test
        public void testRegisterStates() throws Exception {
-               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               final DefaultOperatorStateBackend operatorStateBackend =
+                               new DefaultOperatorStateBackend(classLoader, 
new ExecutionConfig());
+
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -173,7 +192,11 @@ public class OperatorStateBackendTest {
 
        @Test
        public void testSnapshotEmpty() throws Exception {
-               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
+
+               final DefaultOperatorStateBackend operatorStateBackend = 
(DefaultOperatorStateBackend)
+                               
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"testOperator");
+
                CheckpointStreamFactory streamFactory =
                                abstractStateBackend.createStreamFactory(new 
JobID(), "testOperator");
 
@@ -181,12 +204,16 @@ public class OperatorStateBackendTest {
                                operatorStateBackend.snapshot(0L, 0L, 
streamFactory, CheckpointOptions.forFullCheckpoint());
 
                OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(snapshot);
-               Assert.assertNull(stateHandle);
+               assertNull(stateHandle);
        }
 
        @Test
        public void testSnapshotRestore() throws Exception {
-               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
+
+               DefaultOperatorStateBackend operatorStateBackend = 
(DefaultOperatorStateBackend)
+                               
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"test-op-name");
+
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -255,4 +282,14 @@ public class OperatorStateBackendTest {
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private static Environment createMockEnvironment() {
+               Environment env = mock(Environment.class);
+               when(env.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               
when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+               return env;
+       }
 }

Reply via email to