[ 
https://issues.apache.org/jira/browse/FLINK-26306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496567#comment-17496567
 ] 

Piotr Nowojski commented on FLINK-26306:
----------------------------------------

I've changed type of this issue to an improvement.

> 2. Spread changelog materialization across multiple checkpoints; i.e. 
> materialize different tasks at different times

Can you [~roman] elaborate why would that help? Is it because materialised 
parts of the changelog checkpoints are causing those deletion spikes? If so, 
why is that the case? Why is this only because of the "materialised parts"?

Maybe we should think about some more fair thread pool for async jobs? For 
example every async IO job could get assigned an id/key, and each id/key would 
have it's own queue of tasks to perform. Based on that we could implement all 
kinds of fancy priority schemes, but we could start with something as simple as 
just going in a round robing fashion through all individual per id/key queues 
when polling for a new task to execute. This could be generic and flexible 
enough to be re-used in other use cases (I was thinking about something like 
that for the TMs IO executor in the past).

Re batching. Isn't this more of an independent potential optimisation that we 
could consider independently of the main issue? Depending how long is single IO 
operation. If it's more then a couple of ms, I would prefer to leave them 
separate.

> Triggered checkpoints can be delayed by discarding shared state
> ---------------------------------------------------------------
>
>                 Key: FLINK-26306
>                 URL: https://issues.apache.org/jira/browse/FLINK-26306
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> Quick note: CheckpointCleaner is not involved here.
> When a checkpoint is subsumed, SharedStateRegistry schedules its unused 
> shared state for async deletion. It uses common IO pool for this and adds a 
> Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)
> When a checkpoint is started, CheckpointCoordinator uses the same thread pool 
> to initialize the location for it. (see 
> CheckpointCoordinator.initializeCheckpoint)
> The thread pool is of fixed size 
> [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
>  by default it's the number of CPU cores) and uses FIFO queue for tasks.
> When there is a spike in state deletion, the next checkpoint is delayed 
> waiting for an available IO thread.
> Back-pressure seems reasonable here (similar to CheckpointCleaner); however, 
> this shared state deletion could be spread across multiple subsequent 
> checkpoints, not neccesarily the next one.
> ---- 
> I believe the issue is an pre-existing one; but it particularly affects 
> changelog state backend, because 1) such spikes are likely there; 2) 
> workloads are latency sensitive.
> In the tests, checkpoint duration grows from seconds to minutes immediately 
> after the materialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to