[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294
 ] 

Jun Qin edited comment on FLINK-14942 at 8/12/20, 1:25 PM:
-----------------------------------------------------------

I have doubled checked, when creating a new savepoint from an existing 
savepoint, if the state for an operator in the existing savepoint is not 
touched (i.e., not removed and not modified), the SavepointMetadata of the new 
savepoint will contain the same path as the original metadata which is a 
relative path in Flink 1.11. When you try to restore a job from the new 
savepoint and load the state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from 
any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
        ... 9 more      
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 11 more
Caused by: java.io.FileNotFoundException: 
/Users/jqin/tmp/savepoints/deepcopy2/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No 
such file or directory)
        at java.base/java.io.FileInputStream.open0(Native Method)
        at java.base/java.io.FileInputStream.open(FileInputStream.java:219)
        at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157)
        at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
        at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
        at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
        at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        ... 15 more
{code}
For those untouched operators, a potential solution, I think, is to use the 
read/write API of state processing API library to re-create the state of those 
operators in the new savepoint. [~sjwiesman], What do you think? 


was (Author: qinjunjerry):
I have doubled checked, when creating a new savepoint from an existing 
savepoint, if the state for an operator in the existing savepoint is not 
touched (i.e., not removed and not modified), the SavepointMetadata of the new 
savepoint will contain the same path as the original metadata which is a 
relative path in Flink 1.11. When you try to restore a job from the new 
savepoint and load the state of those untouched operators, it will fail with:

 
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from 
any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
        ... 9 more      
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 11 more
Caused by: java.io.FileNotFoundException: 
/Users/jqin/tmp/savepoints/deepcopy2/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No 
such file or directory)
        at java.base/java.io.FileInputStream.open0(Native Method)
        at java.base/java.io.FileInputStream.open(FileInputStream.java:219)
        at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157)
        at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
        at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
        at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
        at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        ... 15 more
{code}

 

 

For those untouched operators, a potential solution, I think, is to use the 
read/write API of state processing API library to re-create the state of those 
operators in the new savepoint. [~sjwiesman], What do you think? 

> State Processing API: add an option to make deep copy
> -----------------------------------------------------
>
>                 Key: FLINK-14942
>                 URL: https://issues.apache.org/jira/browse/FLINK-14942
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor
>            Reporter: Jun Qin
>            Assignee: Jun Qin
>            Priority: Major
>              Labels: usability
>             Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to