[
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294
]
Jun Qin edited comment on FLINK-14942 at 8/12/20, 1:19 PM:
-----------------------------------------------------------
I have doubled checked, when creating a new savepoint from an existing
savepoint, if the state for an operator in the existing savepoint is not
touched (i.e., not removed and not modified), the SavepointMetadata of the new
savepoint will contain the same path as the original metadata which is a
relative path in Flink 1.11. When you try to restore a job from the new
savepoint and load the state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from
any of the 1 provided restore options.Caused by:
org.apache.flink.util.FlinkException: Could not restore keyed state backend for
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1
provided restore options. at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed when trying to restore heap backend at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io.FileNotFoundException:
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or
directory) at java.base/java.io.FileInputStream.open0(Native Method) at
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at
java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the
read/write API of state processing API library to re-create the state of those
operators in the new savepoint. [~sjwiesman], What do you think?
was (Author: qinjunjerry):
I have doubled checked, when creating a new savepoint from an existing
savepoint, if the state for an operator in the existing savepoint is not
touched (i.e., not removed and not modified), the SavepointMetadata of the new
savepoint will contain the same path as the original metadata which is a
relative path in Flink 1.11. When you try to restore a job from the new
savepoint and load the state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from
any of the 1 provided restore options.Caused by:
org.apache.flink.util.FlinkException: Could not restore keyed state backend for
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1
provided restore options. at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed when trying to restore heap backend at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 moreCaused by: java.io.FileNotFoundException:
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or
directory) at java.base/java.io.FileInputStream.open0(Native Method) at
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at
java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the
read/write API of state processing API library to re-create the state of those
operators in the new savepoint. [~sjwiesman], What do you think?
> State Processing API: add an option to make deep copy
> -----------------------------------------------------
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
> Issue Type: Improvement
> Components: API / State Processor
> Reporter: Jun Qin
> Assignee: Jun Qin
> Priority: Major
> Labels: usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then
> there are references in the new savepoint to the source savepoint. Here is
> the [State Processing API
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
> says:
> bq. Note: When basing a new savepoint on existing state, the state processor
> api makes a shallow copy of the pointers to the existing operators. This
> means that both savepoints share state and one cannot be deleted without
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow
> copy) such that the new savepoint is self-contained.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)