[hotfix] Restore KeySerializer only once

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfb6a698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfb6a698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfb6a698

Branch: refs/heads/release-1.3
Commit: cfb6a6982cce89f76209d7c4bea4c9905fd5092a
Parents: 51fb7ed
Author: Till Rohrmann <[email protected]>
Authored: Thu May 18 16:41:15 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu May 18 23:16:27 2017 +0200

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfb6a698/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 8d3d8a0..6eb314b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                int numRegisteredKvStates = 0;
                stateTables.clear();
 
+               boolean keySerializerRestored = false;
+
                for (KeyedStateHandle keyedStateHandle : state) {
 
                        if (keyedStateHandle == null) {
@@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                serializationProxy.read(inView);
 
-                               // check for key serializer compatibility; this 
also reconfigures the
-                               // key serializer to be compatible, if it is 
required and is possible
-                               if 
(StateMigrationUtil.resolveCompatibilityResult(
+                               if (!keySerializerRestored) {
+                                       // check for key serializer 
compatibility; this also reconfigures the
+                                       // key serializer to be compatible, if 
it is required and is possible
+                                       if 
(StateMigrationUtil.resolveCompatibilityResult(
                                                
serializationProxy.getKeySerializer(),
                                                
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
                                                
serializationProxy.getKeySerializerConfigSnapshot(),
                                                (TypeSerializer) keySerializer)
-                                       .isRequiresMigration()) {
+                                               .isRequiresMigration()) {
 
-                                       // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
-                                       throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
-                                               "Aborting now since state 
migration is currently not available");
-                               }
+                                               // TODO replace with state 
migration; note that key hash codes need to remain the same after migration
+                                               throw new RuntimeException("The 
new key serializer is not compatible to read previous keys. " +
+                                                       "Aborting now since 
state migration is currently not available");
+                                       }
 
+                                       keySerializerRestored = true;
+                               }
+                               
                                
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
                                                
serializationProxy.getStateMetaInfoSnapshots();
 

Reply via email to