[FLINK-5995] [checkpoints] Fix serializer initialization for Operator State

This closes #3503


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614abd29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614abd29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614abd29

Branch: refs/heads/master
Commit: 614abd29ee2a33be5cd98a6ce55abd1b605fc296
Parents: 486f724
Author: 金竹 <[email protected]>
Authored: Fri Mar 10 09:29:57 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Mon Mar 20 13:20:03 2017 +0100

----------------------------------------------------------------------
 .../runtime/state/AbstractStateBackend.java     |  2 +-
 .../state/DefaultOperatorStateBackend.java      | 12 ++++++++--
 .../runtime/state/OperatorStateBackendTest.java | 25 ++++++++++++++++++++
 3 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 2cf20a1..74025bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -95,7 +95,7 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
                        Environment env,
                        String operatorIdentifier) throws Exception {
 
-               return new 
DefaultOperatorStateBackend(env.getUserClassLoader());
+               return new 
DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 2497a00..71cccae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -56,15 +57,20 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        private final CloseableRegistry closeStreamOnCancelRegistry;
        private final JavaSerializer<Serializable> javaSerializer;
        private final ClassLoader userClassloader;
+       private final ExecutionConfig executionConfig;
 
-       public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws 
IOException {
-
+       public DefaultOperatorStateBackend(ClassLoader userClassLoader, 
ExecutionConfig executionConfig) throws IOException {
                this.closeStreamOnCancelRegistry = new CloseableRegistry();
                this.userClassloader = 
Preconditions.checkNotNull(userClassLoader);
+               this.executionConfig = executionConfig;
                this.javaSerializer = new JavaSerializer<>();
                this.registeredStates = new HashMap<>();
        }
 
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
        @Override
        public Set<String> getRegisteredStateNames() {
                return registeredStates.keySet();
@@ -106,6 +112,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                Preconditions.checkNotNull(stateDescriptor);
 
+               
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+
                String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
                TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index d883d6e..157d5ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -28,6 +29,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.RunnableFuture;
@@ -45,6 +47,8 @@ public class OperatorStateBackendTest {
 
        static Environment createMockEnvironment() {
                Environment env = mock(Environment.class);
+               ExecutionConfig config = mock(ExecutionConfig.class);
+               when(env.getExecutionConfig()).thenReturn(config);
                
when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
                return env;
        }
@@ -64,6 +68,27 @@ public class OperatorStateBackendTest {
        }
 
        @Test
+       public void testRegisterStatesWithoutTypeSerializer() throws Exception {
+               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               ListStateDescriptor<File> stateDescriptor = new 
ListStateDescriptor<>("test", File.class);
+               ListStateDescriptor<String> stateDescriptor2 = new 
ListStateDescriptor<>("test2", String.class);
+               ListState<File> listState = 
operatorStateBackend.getOperatorState(stateDescriptor);
+               assertNotNull(listState);
+               ListState<String> listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
+               assertNotNull(listState2);
+               assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
+               Iterator<String> it = listState2.get().iterator();
+               assertTrue(!it.hasNext());
+               listState2.add("kevin");
+               listState2.add("sunny");
+
+               it = listState2.get().iterator();
+               assertEquals("kevin", it.next());
+               assertEquals("sunny", it.next());
+               assertTrue(!it.hasNext());
+       }
+
+       @Test
        public void testRegisterStates() throws Exception {
                DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());

Reply via email to