[
https://issues.apache.org/jira/browse/FLINK-26584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504236#comment-17504236
]
Matthias Schwalbe commented on FLINK-26584:
-------------------------------------------
- SavepointTaskManagerRuntimeInfo.getConfiguration() has got an empty
implementation [1] and therefore only JobManagerCheckpointStorage is considered
as checkpoint storage
- simple solution:
- add/initialize Configuration in ctor
{quote}
private final Configuration configuration;
public SavepointTaskManagerRuntimeInfoEx(IOManager ioManager, Configuration
configuration) {
super(ioManager);
this.configuration = configuration;
}
@Override
public Configuration getConfiguration() {
return new Configuration(this.configuration);
}
{quote}
- properly initialize in
org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
{quote}
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return new SavepointTaskManagerRuntimeInfo(getIOManager(),
this.configuration);
}
{quote}
- this allows to configure state.checkpoint-storage: filesystem
[1] SavepointTaskManagerRuntimeInfo.getConfiguration()
> State Processor API fails to write savepoints exceeding 5MB
> -----------------------------------------------------------
>
> Key: FLINK-26584
> URL: https://issues.apache.org/jira/browse/FLINK-26584
> Project: Flink
> Issue Type: Improvement
> Components: API / State Processor
> Affects Versions: 1.13.0, 1.14.0
> Reporter: Matthias Schwalbe
> Priority: Critical
>
> - WritableSavepoint.write(…) falls back to JobManagerCheckpointStorage which
> restricts savepoint size to 5MiB
> - See relevant exception stack here [1]
> - This is because SavepointTaskManagerRuntimeInfo.getConfiguration()
> always returns empty Configuration, hence
> - Neither “state.checkpoint-storage” nor “state.checkpoints.dir” are/can
> be configured
> - ‘fix’: provide SavepointTaskManagerRuntimeInfo.getConfiguration() with
> a meaningful implementation and set configuration in
> SavepointEnvironment.getTaskManagerInfo()
>
> [1]
> 8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR
> BatchTask - Error in task code: MapPartition
> (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
> java.util.concurrent.ExecutionException: java.io.IOException: Size of the
> state is larger than the maximum permitted memory-backed state.
> Size=180075318 , maxSize=5242880 . Consider using a different state backend,
> like the File System State backend.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
> at
> org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
> at
> org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
> at
> org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> at
> org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Size of the state is larger than the maximum
> permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider
> using a different state backend, like the File System State backend.
> at
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
> at
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
> at
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
> at
> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
> at
> java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> at java.util.concurrent.FutureTask.run(FutureTask.java)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
> ... 14 more
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)