[ 
https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15512838#comment-15512838
 ] 

ASF GitHub Bot commented on FLINK-4603:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2533#discussion_r80010321
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
    @@ -266,18 +265,20 @@ public void 
restorePartitionedState(List<KeyGroupsStateHandle> state) throws Exc
                        for (int i = 0; i < numKvStates; ++i) {
                                String stateName = inView.readUTF();
     
    -                           ObjectInputStream ois = new 
ObjectInputStream(inView);
    +                           TypeSerializer namespaceSerializer =
    +                                           
InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
    +                           TypeSerializer stateSerializer =
    +                                           
InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
     
    -                           TypeSerializer namespaceSerializer = 
(TypeSerializer) ois.readObject();
    -                           TypeSerializer stateSerializer = 
(TypeSerializer) ois.readObject();
    -                           StateTable<K, ?, ?> stateTable = new 
StateTable(stateSerializer,
    +                           StateTable<K, ?, ?> stateTable = new StateTable(
    +                                           stateSerializer,
                                                namespaceSerializer,
                                                keyGroupRange);
                                stateTables.put(stateName, stateTable);
                                kvStatesById.put(i, stateName);
                        }
     
    -                   for (int keyGroupIndex = 
keyGroupRange.getStartKeyGroup(); keyGroupIndex <= 
keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
    +                   for (int keyGroupIndex = 
keyGroupRange.getStartKeyGroup();  keyGroupIndex <= 
keyGroupRange.getEndKeyGroup(); ++keyGroupIndex) {
    --- End diff --
    
    I think i wanted to break this because it exceeds the line limit but then 
decided against it because IntelliJ messed up the formatting for loops. Nothing 
wrong there at all.


> KeyedStateBackend cannot restore user code classes
> --------------------------------------------------
>
>                 Key: FLINK-4603
>                 URL: https://issues.apache.org/jira/browse/FLINK-4603
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Till Rohrmann
>            Assignee: Stefan Richter
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> A user reported that he cannot restore keyed state which contains user code 
> classes. I suspect that we don't use the user code class loader to 
> deserialize the state.
> The solution seems to be to forward the user code class loader to the 
> {{KeyedStateBackends}} when restoring state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to