rkhachatryan commented on code in PR #20152:
URL: https://github.com/apache/flink/pull/20152#discussion_r928325966
##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -87,9 +87,14 @@ protected <K> CheckpointableKeyedStateBackend<K> restore(
String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
ExecutionConfig executionConfig = env.getExecutionConfig();
+ env.getAsyncOperationsThreadPool();
+
ChangelogStateFactory changelogStateFactory = new
ChangelogStateFactory();
CheckpointableKeyedStateBackend<K> keyedStateBackend =
ChangelogBackendRestoreOperation.restore(
+ env.getJobID(),
+ env.getAsyncOperationsThreadPool(),
+ env.getTaskManagerInfo().getConfiguration(),
Review Comment:
Thanks for checking this @fredia .
Do you mean changing `StateChangelogStorageFactory` interface and passing
`ExecutionConfig` to `createStorageView` instead of `Configuration`?
That would require any new configuration parameter to be placed in
`ExecutionConfig`.
That would be problematic especially for non-bundled
`StateChangelogStorageFactory` implementations.
After, #20160 (FLINK-28286), it should be env.getJobConfiguration()` ideally
merged with `env.getTaskManagerInfo().getConfiguration()`, right?
If so, I think this merging can either be implemented in this PR or in
FLINK-26372.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]