[
https://issues.apache.org/jira/browse/FLINK-26306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496196#comment-17496196
]
Roman Khachatryan commented on FLINK-26306:
-------------------------------------------
Theoretically, I think there are the following ways to solve this problem:
1. Batch deletions and leave one thread idle (e.g. group 1K handles into 10 big
batches handled by 11 IO threads)
2. Spread changelog materialization across multiple checkpoints; i.e.
materialize different tasks at different times
3. Use fork-join pool and rely on its task ordering (IIUC the implementation)
4. Use separate thread pools
5. Use unbounded thread pool
(1) seems the way to go to me.
(2) is not guaranteed to improve and is limited to changelog
(3) relies on fork-join pool implementation
(4) and (5) can be quickly ruled out because they remove back-pressure completly
(1) leaves one thread free for new checkpoints initially. If the rest don't
keep up with deletion, then the next spike will consume the remaining capacity,
and checkpointing will be back-pressured.
The size/number of batches should be determined by the pool itself, so we'd
need to wrap the original IO executor.
WDYT [~pnowojski], [[email protected]], [~ym], [~yunta] ?
> Triggered checkpoints can be delayed by discarding shared state
> ---------------------------------------------------------------
>
> Key: FLINK-26306
> URL: https://issues.apache.org/jira/browse/FLINK-26306
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.15.0, 1.14.3
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Fix For: 1.15.0
>
>
> Quick note: CheckpointCleaner is not involved here.
> When a checkpoint is subsumed, SharedStateRegistry schedules its unused
> shared state for async deletion. It uses common IO pool for this and adds a
> Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)
> When a checkpoint is started, CheckpointCoordinator uses the same thread pool
> to initialize the location for it. (see
> CheckpointCoordinator.initializeCheckpoint)
> The thread pool is of fixed size
> [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
> by default it's the number of CPU cores) and uses FIFO queue for tasks.
> When there is a spike in state deletion, the next checkpoint is delayed
> waiting for an available IO thread.
> Back-pressure seems reasonable here (similar to CheckpointCleaner); however,
> this shared state deletion could be spread across multiple subsequent
> checkpoints, not neccesarily the next one.
> ----
> I believe the issue is an pre-existing one; but it particularly affects
> changelog state backend, because 1) such spikes are likely there; 2)
> workloads are latency sensitive.
> In the tests, checkpoint duration grows from seconds to minutes immediately
> after the materialization.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)