[
https://issues.apache.org/jira/browse/FLINK-25400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784691#comment-17784691
]
Jinzhong Li commented on FLINK-25400:
-------------------------------------
I think this issue still exits in 1.19.
The task configuration of SavepointEnvironment should be passed to
SavepointTaskManagerRuntimeInfo,
so that SavepointEnvironment.getTaskManagerInfo().getConfiguration() can obtain
the task configuration.
If my analysis is correct, I want to fix this bug.
> RocksDBStateBackend configurations does not work with SavepointEnvironment
> --------------------------------------------------------------------------
>
> Key: FLINK-25400
> URL: https://issues.apache.org/jira/browse/FLINK-25400
> Project: Flink
> Issue Type: Bug
> Components: API / State Processor
> Affects Versions: 1.12.2
> Reporter: wuzhiyu
> Priority: Major
>
> Hi~
> I'm trying to use flink-state-processor-api to do state migrations by reading
> states from an existing savepoint, and writing them into a new savepoint
> after certain transformations.
> However, the reading rate does not meet my expectation.
> When I tried to tune RocksDB by enabling RocksDB native metrics, I found it
> did not work.
> So I did some debug, I found when the job is running under a
> SavepointEnvironment, no RocksDBStatebackend configurations will be passed to
> RocksDBStateBackend.
> The whole process is described as below (code demonstrated is under version
> release-1.12.2):
> First, when
> org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is
> invoked:
>
> {code:java}
> // org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
> private StateBackend createStateBackend() throws Exception {
> final StateBackend fromApplication =
> configuration.getStateBackend(getUserCodeClassLoader());
> return StateBackendLoader.fromApplicationOrConfigOrDefault(
> fromApplication,
> getEnvironment().getTaskManagerInfo().getConfiguration(),
> getUserCodeClassLoader(),
> LOG); {code}
> *getEnvironment()* returns a SavepointEnvironment instance.
>
> And then
> *org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo*
> is invoked, it returns a new
> *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo* instance.
>
> {code:java}
> // org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
> @Override
> public TaskManagerRuntimeInfo getTaskManagerInfo() {
> return new SavepointTaskManagerRuntimeInfo(getIOManager());
> } {code}
>
> At last,
> *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration*
> is invoked. It returns an empty configuration, which means all
> configurations will be lost.
> {code:java}
> //
> org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration
> @Override
> public Configuration getConfiguration() {
> return new Configuration();
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)