[ https://issues.apache.org/jira/browse/FLINK-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15552273#comment-15552273 ]
ASF GitHub Bot commented on FLINK-4731: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82216509 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -259,28 +263,29 @@ public void restorePartitionedState(List<KeyGroupsStateHandle> state) throws Exc try { - fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream(); + fsDataInputStream = keyGroupsHandle.openInputStream(); cancelStreamRegistry.registerClosable(fsDataInputStream); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream); int numKvStates = inView.readShort(); - Map<Integer, String> kvStatesById = new HashMap<>(numKvStates); - for (int i = 0; i < numKvStates; ++i) { String stateName = inView.readUTF(); - TypeSerializer namespaceSerializer = + TypeSerializer<?> namespaceSerializer = InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader); - TypeSerializer stateSerializer = + TypeSerializer<?> stateSerializer = InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader); - StateTable<K, ?, ?> stateTable = new StateTable(stateSerializer, - namespaceSerializer, - keyGroupRange); - stateTables.put(stateName, stateTable); - kvStatesById.put(i, stateName); + StateTable<K, ?, ?> stateTable = stateTables.get(stateName); + + if (null == stateTable) { --- End diff -- Let's add a comment here that it is important to check for previously restored state first. > HeapKeyedStateBackend restoring broken for scale-in > --------------------------------------------------- > > Key: FLINK-4731 > URL: https://issues.apache.org/jira/browse/FLINK-4731 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > > Restoring the HeapKeyedStateBackend is broken in case that parallelism is > reduced. The restore method is overwriting previously restored state. > We should also add scale-in testing to the RescalingITCase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)