wuzhiyu created FLINK-25400:
-------------------------------
Summary: RocksDBStateBackend configurations does not work with
SavepointEnvironment
Key: FLINK-25400
URL: https://issues.apache.org/jira/browse/FLINK-25400
Project: Flink
Issue Type: Bug
Components: API / State Processor
Affects Versions: 1.12.2
Reporter: wuzhiyu
Hi~
I'm trying to use flink-state-processor-api to do state migrations by reading
states from an existing savepoint, and writing them into a new savepoint after
certain transformations.
However, the reading rate does not meet my expectation.
When I tried to tune RocksDB by enabling RocksDB native metrics, I found it did
not work.
So I did some debug, I found when the job is running under a
SavepointEnvironment, no RocksDBStatebackend configurations will be passed to
RocksDBStateBackend.
The whole process is described as below (code demonstrated is under version
release-1.12.2):
First, when
org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is
invoked:
{code:java}
// org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
private StateBackend createStateBackend() throws Exception {
final StateBackend fromApplication =
configuration.getStateBackend(getUserCodeClassLoader());
return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG); {code}
*getEnvironment()* returns a SavepointEnvironment instance.
And then
*org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo* is
invoked, it returns a new
*org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo* instance.
{code:java}
// org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return new SavepointTaskManagerRuntimeInfo(getIOManager());
} {code}
At last,
*org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration*
is invoked. It returns an empty configuration, which means all configurations
will be lost.
{code:java}
//
org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration
@Override
public Configuration getConfiguration() {
return new Configuration();
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)