[ 
https://issues.apache.org/jira/browse/FLINK-26306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496790#comment-17496790
 ] 

Roman Khachatryan commented on FLINK-26306:
-------------------------------------------

> > 1. Batch deletions and leave one thread idle (e.g. group 1K handles into 10 
> > big batches handled by 11 IO threads)
 
> Would it even work if you hardcoded in the CheckpointCoordinator assumptions 
> about pool size and the number of used threads? We don't know how else this 
> thread pool is being used.
 
We can avoid hardcoding these numbers and put the logic into some wrapper 
around the pool (with a new method that accepts a list of Runnables or Handles).
 
> Is this the right level to provide back pressure functionality?
 
> Don't we already have a backpressure mechanism on a higher level? 
> CheckpointRequestDecider#numberOfCleaningCheckpointsSupplier from FLINK-17073?
 
Currently, deletions from SharedStateRegistry don't go through the 
CheckpointCleaner. SharedStateRegistry differs in the following:
1. A handle is associated with multiple checkpoints (or none at the time of 
deletion) - so a separate counter would be needed
2. It's not clear to me what number of pending handle deletions should be 
allowed
Conceptually, it should be a condition instead of a number:
- when scheduling shared state deletion upon checkpoint subsumption
- if there are any deletions from a previous subsumption then don't allow new 
checkpoints
- start allowing new checkpoints when there newPending <= oldPending
This would be achieved by batchind deletions naturally; but implemented in 
CheckpointRequestDecider it could add extra complexity.
 
The advantage of doing it in CheckpointRequestDecider is more accurate 
reporting of checkpoint duration.
 
Maybe batching can be a short-term solution; which can be evolved gradually (by 
replacing executor in wrapper by multiple queues; and then checking queue size 
in CheckpointRequestDecider). WDYT?
 
> It looks like simple fair io thread pool as I described above, without any 
> priorities + addjusting/relaxing 
> numberOfCleaningCheckpointsSupplier.getAsInt() > 
> maxConcurrentCheckpointAttempts check to something like 
> numberOfCleaningCheckpointsSupplier.getAsInt() > 
> maxConcurrentCheckpointAttempts + CONSTANT would do the trick, wouldn't it?
IIUC, it wouldn't excert back-pressure, because the number of handles is not 
directly related to the number of checkpoints to clean (I'm assuming a seperate 
thread pools for discarding shared state and initializing new checkpoints).
Taken to the extreme, checkpoints might consist solely of shared state, so 
checkpoint deletion will be fast. But then discarding shared state might take 
arbitrarily long.

> Triggered checkpoints can be delayed by discarding shared state
> ---------------------------------------------------------------
>
>                 Key: FLINK-26306
>                 URL: https://issues.apache.org/jira/browse/FLINK-26306
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> Quick note: CheckpointCleaner is not involved here.
> When a checkpoint is subsumed, SharedStateRegistry schedules its unused 
> shared state for async deletion. It uses common IO pool for this and adds a 
> Runnable per state handle. ( see SharedStateRegistryImpl.scheduleAsyncDelete)
> When a checkpoint is started, CheckpointCoordinator uses the same thread pool 
> to initialize the location for it. (see 
> CheckpointCoordinator.initializeCheckpoint)
> The thread pool is of fixed size 
> [jobmanager.io-pool.size|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-io-pool-size];
>  by default it's the number of CPU cores) and uses FIFO queue for tasks.
> When there is a spike in state deletion, the next checkpoint is delayed 
> waiting for an available IO thread.
> Back-pressure seems reasonable here (similar to CheckpointCleaner); however, 
> this shared state deletion could be spread across multiple subsequent 
> checkpoints, not neccesarily the next one.
> ---- 
> I believe the issue is an pre-existing one; but it particularly affects 
> changelog state backend, because 1) such spikes are likely there; 2) 
> workloads are latency sensitive.
> In the tests, checkpoint duration grows from seconds to minutes immediately 
> after the materialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to