Repository: flink
Updated Branches:
  refs/heads/master 609b2577a -> 264f6df8e


[FLINK-6018] Properly initialise StateDescriptor in 
AbstractStateBackend.getPartitionedState()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/264f6df8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/264f6df8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/264f6df8

Branch: refs/heads/master
Commit: 264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c
Parents: 609b257
Author: 金竹 <[email protected]>
Authored: Tue Mar 14 22:26:53 2017 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Mar 15 18:43:58 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBKeyedStateBackend.java       |  6 ++++--
 .../contrib/streaming/state/RocksDBStateBackend.java    |  3 ++-
 .../flink/runtime/state/AbstractKeyedStateBackend.java  | 11 ++++++-----
 .../flink/runtime/state/filesystem/FsStateBackend.java  |  3 ++-
 .../flink/runtime/state/heap/HeapKeyedStateBackend.java |  6 ++++--
 .../flink/runtime/state/memory/MemoryStateBackend.java  |  3 ++-
 .../netty/message/KvStateRequestSerializerTest.java     |  7 +++++--
 .../runtime/state/heap/HeapAggregatingStateTest.java    |  3 ++-
 .../flink/runtime/state/heap/HeapListStateTest.java     |  3 ++-
 .../flink/runtime/state/heap/HeapReducingStateTest.java |  3 ++-
 .../test/query/KVStateRequestSerializerRocksDBTest.java | 12 ++++++++----
 11 files changed, 39 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index eb926c0..aaccc2f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -144,10 +145,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange
+                       KeyGroupRange keyGroupRange,
+                       ExecutionConfig executionConfig
        ) throws IOException {
 
-               super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange);
+               super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
                this.columnOptions = 
Preconditions.checkNotNull(columnFamilyOptions);
                this.dbOptions = Preconditions.checkNotNull(dbOptions);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index dd0e2f7..16b0869 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -259,7 +259,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                                kvStateRegistry,
                                keySerializer,
                                numberOfKeyGroups,
-                               keyGroupRange);
+                               keyGroupRange,
+                               env.getExecutionConfig());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 14f897f..aba00f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -94,12 +94,15 @@ public abstract class AbstractKeyedStateBackend<K>
 
        protected final ClassLoader userCodeClassLoader;
 
+       private final ExecutionConfig executionConfig;
+
        public AbstractKeyedStateBackend(
                        TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
                        ClassLoader userCodeClassLoader,
                        int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange) {
+                       KeyGroupRange keyGroupRange,
+                       ExecutionConfig executionConfig) {
 
                this.kvStateRegistry = 
kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -108,6 +111,7 @@ public abstract class AbstractKeyedStateBackend<K>
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
                this.cancelStreamRegistry = new CloseableRegistry();
                this.keyValueStatesByName = new HashMap<>();
+               this.executionConfig = executionConfig;
        }
 
        /**
@@ -349,10 +353,7 @@ public abstract class AbstractKeyedStateBackend<K>
 
                checkNotNull(namespace, "Namespace");
 
-               // TODO: This is wrong, it should throw an exception that the 
initialization has not properly happened
-               if (!stateDescriptor.isSerializerInitialized()) {
-                       stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
+               stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 
                if (lastName != null && 
lastName.equals(stateDescriptor.getName())) {
                        lastState.setCurrentNamespace(namespace);

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 5e8a15d..2e9198f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -208,7 +208,8 @@ public class FsStateBackend extends AbstractStateBackend {
                                keySerializer,
                                env.getUserClassLoader(),
                                numberOfKeyGroups,
-                               keyGroupRange);
+                               keyGroupRange,
+                               env.getExecutionConfig());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 4a5455a..a4a08c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -100,9 +101,10 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        TypeSerializer<K> keySerializer,
                        ClassLoader userCodeClassLoader,
                        int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange) {
+                       KeyGroupRange keyGroupRange,
+                       ExecutionConfig executionConfig) {
 
-               super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange);
+               super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
 
                LOG.info("Initializing heap keyed state backend with stream 
factory.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 6e6b034..da01c09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -97,6 +97,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
                                keySerializer,
                                env.getUserClassLoader(),
                                numberOfKeyGroups,
-                               keyGroupRange);
+                               keyGroupRange,
+                               env.getExecutionConfig());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index dd61a3f..93094a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -331,7 +332,8 @@ public class KvStateRequestSerializerTest {
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
                                ClassLoader.getSystemClassLoader(),
-                               1, new KeyGroupRange(0, 0)
+                               1, new KeyGroupRange(0, 0),
+                               new ExecutionConfig()
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -430,7 +432,8 @@ public class KvStateRequestSerializerTest {
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
                                ClassLoader.getSystemClassLoader(),
-                               1, new KeyGroupRange(0, 0)
+                               1, new KeyGroupRange(0, 0),
+                               new ExecutionConfig()
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
index a7ae5be..735b5f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -236,7 +236,8 @@ public class HeapAggregatingStateTest {
                                StringSerializer.INSTANCE,
                                HeapAggregatingStateTest.class.getClassLoader(),
                                16,
-                               new KeyGroupRange(0, 15));
+                               new KeyGroupRange(0, 15),
+                               new ExecutionConfig());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 746db28..c36a48b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -232,7 +232,8 @@ public class HeapListStateTest {
                                StringSerializer.INSTANCE,
                                HeapListStateTest.class.getClassLoader(),
                                16,
-                               new KeyGroupRange(0, 15));
+                               new KeyGroupRange(0, 15),
+                               new ExecutionConfig());
        }
        
        private static <T> void validateResult(Iterable<T> values, Set<T> 
expected) {

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
index e0929f1..63eec04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -236,7 +236,8 @@ public class HeapReducingStateTest {
                                StringSerializer.INSTANCE,
                                HeapReducingStateTest.class.getClassLoader(),
                                16,
-                               new KeyGroupRange(0, 15));
+                               new KeyGroupRange(0, 15),
+                               new ExecutionConfig());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 6e2fd62..3c86f90 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.query;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -74,12 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest {
                                final TaskKvStateRegistry kvStateRegistry,
                                final TypeSerializer<K> keySerializer,
                                final int numberOfKeyGroups,
-                               final KeyGroupRange keyGroupRange) throws 
Exception {
+                               final KeyGroupRange keyGroupRange,
+                               final ExecutionConfig executionConfig) throws 
Exception {
 
                        super(jobId, operatorIdentifier, userCodeClassLoader,
                                instanceBasePath,
                                dbOptions, columnFamilyOptions, 
kvStateRegistry, keySerializer,
-                               numberOfKeyGroups, keyGroupRange);
+                               numberOfKeyGroups, keyGroupRange, 
executionConfig);
                }
 
                @Override
@@ -115,7 +117,8 @@ public final class KVStateRequestSerializerRocksDBTest {
                                columnFamilyOptions,
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
-                               1, new KeyGroupRange(0, 0)
+                               1, new KeyGroupRange(0, 0),
+                               new ExecutionConfig()
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -150,7 +153,8 @@ public final class KVStateRequestSerializerRocksDBTest {
                                columnFamilyOptions,
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
-                               1, new KeyGroupRange(0, 0)
+                               1, new KeyGroupRange(0, 0),
+                               new ExecutionConfig()
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 

Reply via email to