Repository: flink Updated Branches: refs/heads/master 9a64d50f0 -> e8c909503
[hotfix] Restore KeySerializer only once Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef6f7b60 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef6f7b60 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef6f7b60 Branch: refs/heads/master Commit: ef6f7b605728545eb8196e655db23a43853e0663 Parents: 9a64d50 Author: Till Rohrmann <[email protected]> Authored: Thu May 18 16:41:15 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu May 18 23:19:28 2017 +0200 ---------------------------------------------------------------------- .../state/heap/HeapKeyedStateBackend.java | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ef6f7b60/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 8d3d8a0..6eb314b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { int numRegisteredKvStates = 0; stateTables.clear(); + boolean keySerializerRestored = false; + for (KeyedStateHandle keyedStateHandle : state) { if (keyedStateHandle == null) { @@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.read(inView); - // check for key serializer compatibility; this also reconfigures the - // key serializer to be compatible, if it is required and is possible - if (StateMigrationUtil.resolveCompatibilityResult( + if (!keySerializerRestored) { + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( serializationProxy.getKeySerializer(), TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), (TypeSerializer) keySerializer) - .isRequiresMigration()) { + .isRequiresMigration()) { - // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + - "Aborting now since state migration is currently not available"); - } + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + keySerializerRestored = true; + } + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
