[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294
 ] 

Jun Qin edited comment on FLINK-14942 at 8/12/20, 1:19 PM:
-----------------------------------------------------------

I have doubled checked, when creating a new savepoint from an existing 
savepoint, if the state for an operator in the existing savepoint is not 
touched (i.e., not removed and not modified), the SavepointMetadata of the new 
savepoint will contain the same path as the original metadata which is a 
relative path in Flink 1.11. When you try to restore a job from the new 
savepoint and load the state of those untouched operators, it will fail with:

 
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from 
any of the 1 provided restore options.Caused by: 
org.apache.flink.util.FlinkException: Could not restore keyed state backend for 
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 
provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
 ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore heap backend at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 more
Caused by: java.io.FileNotFoundException: 
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or 
directory) at java.base/java.io.FileInputStream.open0(Native Method) at 
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at 
java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
 at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
 at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
 at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
 at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 more{code}
 

For those untouched operators, a potential solution, I think, is to use the 
read/write API of state processing API library to re-create the state of those 
operators in the new savepoint. [~sjwiesman], What do you think? 


was (Author: qinjunjerry):
I have doubled checked, when creating a new savepoint from an existing 
savepoint, if the state for an operator in the existing savepoint is not 
touched (i.e., not removed and not modified), the SavepointMetadata of the new 
savepoint will contain the same path as the original metadata which is a 
relative path in Flink 1.11. When you try to restore a job from the new 
savepoint and load the state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from 
any of the 1 provided restore options.Caused by: 
org.apache.flink.util.FlinkException: Could not restore keyed state backend for 
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 
provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
 ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore heap backend at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 moreCaused by: java.io.FileNotFoundException: 
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or 
directory) at java.base/java.io.FileInputStream.open0(Native Method) at 
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at 
java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
 at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
 at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
 at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
 at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the 
read/write API of state processing API library to re-create the state of those 
operators in the new savepoint. [~sjwiesman], What do you think? 

> State Processing API: add an option to make deep copy
> -----------------------------------------------------
>
>                 Key: FLINK-14942
>                 URL: https://issues.apache.org/jira/browse/FLINK-14942
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor
>            Reporter: Jun Qin
>            Assignee: Jun Qin
>            Priority: Major
>              Labels: usability
>             Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to