[FLINK-6018] Add tests for KryoSerializer restore with registered types This commit also renames isCompatibleWith() to canRestoreFrom() to make the method asymetric because in the case of KryoSerializer we can restore from state that was stored using no registed types/serializers while the other way around is not possible.
This closes #3534. This closes #3603. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09164cf2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09164cf2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09164cf2 Branch: refs/heads/table-retraction Commit: 09164cf2388888bc2f92f0ca63bb1f15283e895c Parents: 68289b1 Author: Aljoscha Krettek <[email protected]> Authored: Thu Mar 16 15:17:05 2017 +0100 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Mar 24 12:34:03 2017 +0800 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 2 +- .../api/common/typeutils/TypeSerializer.java | 2 +- .../typeutils/runtime/kryo/KryoSerializer.java | 16 + .../AbstractKeyedCEPPatternOperator.java | 2 +- .../state/AbstractKeyedStateBackend.java | 4 +- .../state/DefaultOperatorStateBackend.java | 6 +- .../state/RegisteredBackendStateMetaInfo.java | 6 +- .../state/heap/HeapKeyedStateBackend.java | 2 +- .../runtime/state/StateBackendTestBase.java | 1346 ++++++++++++------ 9 files changed, 967 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 5b72e03..2ce527f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -821,7 +821,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { descriptor.getSerializer()); if (stateInfo != null) { - if (newMetaInfo.isCompatibleWith(stateInfo.f1)) { + if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { stateInfo.f1 = newMetaInfo; return stateInfo.f0; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index ac7fbc8..6edaec6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -161,7 +161,7 @@ public abstract class TypeSerializer<T> implements Serializable { public abstract int hashCode(); - public boolean isCompatibleWith(TypeSerializer<?> other) { + public boolean canRestoreFrom(TypeSerializer<?> other) { return equals(other); } } http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index 44c952a..cba0c84 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -383,4 +383,20 @@ public class KryoSerializer<T> extends TypeSerializer<T> { checkKryoInitialized(); return this.kryo; } + + @Override + public boolean canRestoreFrom(TypeSerializer<?> other) { + if (other instanceof KryoSerializer) { + KryoSerializer<?> otherKryo = (KryoSerializer<?>) other; + + // we cannot include the Serializers here because they don't implement the equals method + return other.canEqual(this) && + type == otherKryo.type && + (registeredTypes.equals(otherKryo.registeredTypes) || otherKryo.registeredTypes.isEmpty()) && + (registeredTypesWithSerializerClasses.equals(otherKryo.registeredTypesWithSerializerClasses) || otherKryo.registeredTypesWithSerializerClasses.isEmpty()) && + (defaultSerializerClasses.equals(otherKryo.defaultSerializerClasses) || otherKryo.defaultSerializerClasses.isEmpty()); + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index b6d57cd..3e18660 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -404,7 +404,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> } @Override - public boolean isCompatibleWith(TypeSerializer<?> other) { + public boolean canRestoreFrom(TypeSerializer<?> other) { return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer; } http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 1f2f4a2..e6e7b23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -277,7 +277,7 @@ public abstract class AbstractKeyedStateBackend<K> } if (!stateDescriptor.isSerializerInitialized()) { - throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); + stateDescriptor.initializeSerializerUnlessSet(executionConfig); } InternalKvState<?> existing = keyValueStatesByName.get(stateDescriptor.getName()); @@ -355,8 +355,6 @@ public abstract class AbstractKeyedStateBackend<K> checkNotNull(namespace, "Namespace"); - stateDescriptor.initializeSerializerUnlessSet(executionConfig); - if (lastName != null && lastName.equals(stateDescriptor.getName())) { lastState.setCurrentNamespace(namespace); return (S) lastState; http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 71cccae..ca7cb48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -134,8 +134,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { "Incompatible assignment mode. Provided: " + mode + ", expected: " + partitionableListState.getAssignmentMode()); Preconditions.checkState( - partitionableListState.getPartitionStateSerializer(). - isCompatibleWith(stateDescriptor.getElementSerializer()), + stateDescriptor.getElementSerializer(). + canRestoreFrom(partitionableListState.getPartitionStateSerializer()), "Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() + ", found: " + partitionableListState.getPartitionStateSerializer()); } @@ -258,7 +258,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { registeredStates.put(listState.getName(), listState); } else { - Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith( + Preconditions.checkState(listState.getPartitionStateSerializer().canRestoreFrom( stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " + listState.getPartitionStateSerializer() + " is not compatible with " + stateMetaInfo.getStateSerializer()); http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java index 80bdacd..0d4b3c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java @@ -74,7 +74,7 @@ public class RegisteredBackendStateMetaInfo<N, S> { return stateSerializer; } - public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other) { + public boolean canRestoreFrom(RegisteredBackendStateMetaInfo<?, ?> other) { if (this == other) { return true; @@ -94,8 +94,8 @@ public class RegisteredBackendStateMetaInfo<N, S> { return false; } - return (stateSerializer.isCompatibleWith(other.stateSerializer)) && - (namespaceSerializer.isCompatibleWith(other.namespaceSerializer) + return (stateSerializer.canRestoreFrom(other.stateSerializer)) && + (namespaceSerializer.canRestoreFrom(other.namespaceSerializer) // we also check if there is just a migration proxy that should be replaced by any real serializer || other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy); } http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index f3e4ec6..46ec5c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -141,7 +141,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateTable = newStateTable(newMetaInfo); stateTables.put(stateName, stateTable); } else { - if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) { + if (!newMetaInfo.canRestoreFrom(stateTable.getMetaInfo())) { throw new RuntimeException("Trying to access state using incompatible meta info, was " + stateTable.getMetaInfo() + " trying access with " + newMetaInfo); }
