[hotfix] Restore KeySerializer only once
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfb6a698 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfb6a698 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfb6a698 Branch: refs/heads/release-1.3 Commit: cfb6a6982cce89f76209d7c4bea4c9905fd5092a Parents: 51fb7ed Author: Till Rohrmann <[email protected]> Authored: Thu May 18 16:41:15 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu May 18 23:16:27 2017 +0200 ---------------------------------------------------------------------- .../state/heap/HeapKeyedStateBackend.java | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cfb6a698/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 8d3d8a0..6eb314b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { int numRegisteredKvStates = 0; stateTables.clear(); + boolean keySerializerRestored = false; + for (KeyedStateHandle keyedStateHandle : state) { if (keyedStateHandle == null) { @@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.read(inView); - // check for key serializer compatibility; this also reconfigures the - // key serializer to be compatible, if it is required and is possible - if (StateMigrationUtil.resolveCompatibilityResult( + if (!keySerializerRestored) { + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( serializationProxy.getKeySerializer(), TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), (TypeSerializer) keySerializer) - .isRequiresMigration()) { + .isRequiresMigration()) { - // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + - "Aborting now since state migration is currently not available"); - } + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + keySerializerRestored = true; + } + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
