Repository: flink Updated Branches: refs/heads/master 609b2577a -> 264f6df8e
[FLINK-6018] Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/264f6df8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/264f6df8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/264f6df8 Branch: refs/heads/master Commit: 264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c Parents: 609b257 Author: é竹 <[email protected]> Authored: Tue Mar 14 22:26:53 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Mar 15 18:43:58 2017 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBKeyedStateBackend.java | 6 ++++-- .../contrib/streaming/state/RocksDBStateBackend.java | 3 ++- .../flink/runtime/state/AbstractKeyedStateBackend.java | 11 ++++++----- .../flink/runtime/state/filesystem/FsStateBackend.java | 3 ++- .../flink/runtime/state/heap/HeapKeyedStateBackend.java | 6 ++++-- .../flink/runtime/state/memory/MemoryStateBackend.java | 3 ++- .../netty/message/KvStateRequestSerializerTest.java | 7 +++++-- .../runtime/state/heap/HeapAggregatingStateTest.java | 3 ++- .../flink/runtime/state/heap/HeapListStateTest.java | 3 ++- .../flink/runtime/state/heap/HeapReducingStateTest.java | 3 ++- .../test/query/KVStateRequestSerializerRocksDBTest.java | 12 ++++++++---- 11 files changed, 39 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index eb926c0..aaccc2f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -17,6 +17,7 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; @@ -144,10 +145,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, - KeyGroupRange keyGroupRange + KeyGroupRange keyGroupRange, + ExecutionConfig executionConfig ) throws IOException { - super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange); + super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions); this.dbOptions = Preconditions.checkNotNull(dbOptions); http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index dd0e2f7..16b0869 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -259,7 +259,8 @@ public class RocksDBStateBackend extends AbstractStateBackend { kvStateRegistry, keySerializer, numberOfKeyGroups, - keyGroupRange); + keyGroupRange, + env.getExecutionConfig()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 14f897f..aba00f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -94,12 +94,15 @@ public abstract class AbstractKeyedStateBackend<K> protected final ClassLoader userCodeClassLoader; + private final ExecutionConfig executionConfig; + public AbstractKeyedStateBackend( TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, - KeyGroupRange keyGroupRange) { + KeyGroupRange keyGroupRange, + ExecutionConfig executionConfig) { this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry); this.keySerializer = Preconditions.checkNotNull(keySerializer); @@ -108,6 +111,7 @@ public abstract class AbstractKeyedStateBackend<K> this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); this.cancelStreamRegistry = new CloseableRegistry(); this.keyValueStatesByName = new HashMap<>(); + this.executionConfig = executionConfig; } /** @@ -349,10 +353,7 @@ public abstract class AbstractKeyedStateBackend<K> checkNotNull(namespace, "Namespace"); - // TODO: This is wrong, it should throw an exception that the initialization has not properly happened - if (!stateDescriptor.isSerializerInitialized()) { - stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); - } + stateDescriptor.initializeSerializerUnlessSet(executionConfig); if (lastName != null && lastName.equals(stateDescriptor.getName())) { lastState.setCurrentNamespace(namespace); http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 5e8a15d..2e9198f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -208,7 +208,8 @@ public class FsStateBackend extends AbstractStateBackend { keySerializer, env.getUserClassLoader(), numberOfKeyGroups, - keyGroupRange); + keyGroupRange, + env.getExecutionConfig()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 4a5455a..a4a08c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.commons.io.IOUtils; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -100,9 +101,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, - KeyGroupRange keyGroupRange) { + KeyGroupRange keyGroupRange, + ExecutionConfig executionConfig) { - super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange); + super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); LOG.info("Initializing heap keyed state backend with stream factory."); } http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 6e6b034..da01c09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -97,6 +97,7 @@ public class MemoryStateBackend extends AbstractStateBackend { keySerializer, env.getUserClassLoader(), numberOfKeyGroups, - keyGroupRange); + keyGroupRange, + env.getExecutionConfig()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index dd61a3f..93094a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -331,7 +332,8 @@ public class KvStateRequestSerializerTest { mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, ClassLoader.getSystemClassLoader(), - 1, new KeyGroupRange(0, 0) + 1, new KeyGroupRange(0, 0), + new ExecutionConfig() ); longHeapKeyedStateBackend.setCurrentKey(key); @@ -430,7 +432,8 @@ public class KvStateRequestSerializerTest { mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, ClassLoader.getSystemClassLoader(), - 1, new KeyGroupRange(0, 0) + 1, new KeyGroupRange(0, 0), + new ExecutionConfig() ); longHeapKeyedStateBackend.setCurrentKey(key); http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java index a7ae5be..735b5f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java @@ -236,7 +236,8 @@ public class HeapAggregatingStateTest { StringSerializer.INSTANCE, HeapAggregatingStateTest.class.getClassLoader(), 16, - new KeyGroupRange(0, 15)); + new KeyGroupRange(0, 15), + new ExecutionConfig()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java index 746db28..c36a48b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java @@ -232,7 +232,8 @@ public class HeapListStateTest { StringSerializer.INSTANCE, HeapListStateTest.class.getClassLoader(), 16, - new KeyGroupRange(0, 15)); + new KeyGroupRange(0, 15), + new ExecutionConfig()); } private static <T> void validateResult(Iterable<T> values, Set<T> expected) { http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java index e0929f1..63eec04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java @@ -236,7 +236,8 @@ public class HeapReducingStateTest { StringSerializer.INSTANCE, HeapReducingStateTest.class.getClassLoader(), 16, - new KeyGroupRange(0, 15)); + new KeyGroupRange(0, 15), + new ExecutionConfig()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java index 6e2fd62..3c86f90 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java @@ -18,6 +18,7 @@ package org.apache.flink.test.query; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; @@ -74,12 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest { final TaskKvStateRegistry kvStateRegistry, final TypeSerializer<K> keySerializer, final int numberOfKeyGroups, - final KeyGroupRange keyGroupRange) throws Exception { + final KeyGroupRange keyGroupRange, + final ExecutionConfig executionConfig) throws Exception { super(jobId, operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, - numberOfKeyGroups, keyGroupRange); + numberOfKeyGroups, keyGroupRange, executionConfig); } @Override @@ -115,7 +117,8 @@ public final class KVStateRequestSerializerRocksDBTest { columnFamilyOptions, mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0) + 1, new KeyGroupRange(0, 0), + new ExecutionConfig() ); longHeapKeyedStateBackend.setCurrentKey(key); @@ -150,7 +153,8 @@ public final class KVStateRequestSerializerRocksDBTest { columnFamilyOptions, mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0) + 1, new KeyGroupRange(0, 0), + new ExecutionConfig() ); longHeapKeyedStateBackend.setCurrentKey(key);
