[
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294
]
Jun Qin commented on FLINK-14942:
---------------------------------
I have doubled checked, when you creating a new savepoint from an existing
savepoint, if the state for an operator in the existing savepoint is not
touched (i.e., removed/modified), the SavepointMetadata of the new savepoint
will contain the same path as the original metadata which is a relative path in
Flink 1.11. When you try to restore a job from the new savepoint and load the
state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from
any of the 1 provided restore options.Caused by:
org.apache.flink.util.FlinkException: Could not restore keyed state backend for
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1
provided restore options. at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed when trying to restore heap backend at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 moreCaused by: java.io.FileNotFoundException:
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or
directory) at java.base/java.io.FileInputStream.open0(Native Method) at
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at
java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the
read/write API of state processing API library to re-create the state of those
operators in the new savepoint. [~sjwiesman], What do you think?
> State Processing API: add an option to make deep copy
> -----------------------------------------------------
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
> Issue Type: Improvement
> Components: API / State Processor
> Reporter: Jun Qin
> Assignee: Jun Qin
> Priority: Major
> Labels: usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then
> there are references in the new savepoint to the source savepoint. Here is
> the [State Processing API
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
> says:
> bq. Note: When basing a new savepoint on existing state, the state processor
> api makes a shallow copy of the pointers to the existing operators. This
> means that both savepoints share state and one cannot be deleted without
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow
> copy) such that the new savepoint is self-contained.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)