[FLINK-5995] [checkpoints] Fix serializer initialization for Operator State
This closes #3503 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614abd29 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614abd29 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614abd29 Branch: refs/heads/master Commit: 614abd29ee2a33be5cd98a6ce55abd1b605fc296 Parents: 486f724 Author: é竹 <[email protected]> Authored: Fri Mar 10 09:29:57 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Mar 20 13:20:03 2017 +0100 ---------------------------------------------------------------------- .../runtime/state/AbstractStateBackend.java | 2 +- .../state/DefaultOperatorStateBackend.java | 12 ++++++++-- .../runtime/state/OperatorStateBackendTest.java | 25 ++++++++++++++++++++ 3 files changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 2cf20a1..74025bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -95,7 +95,7 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri Environment env, String operatorIdentifier) throws Exception { - return new DefaultOperatorStateBackend(env.getUserClassLoader()); + return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 2497a00..71cccae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; @@ -56,15 +57,20 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { private final CloseableRegistry closeStreamOnCancelRegistry; private final JavaSerializer<Serializable> javaSerializer; private final ClassLoader userClassloader; + private final ExecutionConfig executionConfig; - public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException { - + public DefaultOperatorStateBackend(ClassLoader userClassLoader, ExecutionConfig executionConfig) throws IOException { this.closeStreamOnCancelRegistry = new CloseableRegistry(); this.userClassloader = Preconditions.checkNotNull(userClassLoader); + this.executionConfig = executionConfig; this.javaSerializer = new JavaSerializer<>(); this.registeredStates = new HashMap<>(); } + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + @Override public Set<String> getRegisteredStateNames() { return registeredStates.keySet(); @@ -106,6 +112,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { Preconditions.checkNotNull(stateDescriptor); + stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); + String name = Preconditions.checkNotNull(stateDescriptor.getName()); TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer()); http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index d883d6e..157d5ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -28,6 +29,7 @@ import org.junit.Assert; import org.junit.Test; import java.io.Serializable; +import java.io.File; import java.util.Collections; import java.util.Iterator; import java.util.concurrent.RunnableFuture; @@ -45,6 +47,8 @@ public class OperatorStateBackendTest { static Environment createMockEnvironment() { Environment env = mock(Environment.class); + ExecutionConfig config = mock(ExecutionConfig.class); + when(env.getExecutionConfig()).thenReturn(config); when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader()); return env; } @@ -64,6 +68,27 @@ public class OperatorStateBackendTest { } @Test + public void testRegisterStatesWithoutTypeSerializer() throws Exception { + DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class); + ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class); + ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor); + assertNotNull(listState); + ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2); + assertNotNull(listState2); + assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); + Iterator<String> it = listState2.get().iterator(); + assertTrue(!it.hasNext()); + listState2.add("kevin"); + listState2.add("sunny"); + + it = listState2.get().iterator(); + assertEquals("kevin", it.next()); + assertEquals("sunny", it.next()); + assertTrue(!it.hasNext()); + } + + @Test public void testRegisterStates() throws Exception { DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
