[ 
https://issues.apache.org/jira/browse/FLINK-13808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946458#comment-16946458
 ] 

Congxian Qiu(klion26) commented on FLINK-13808:
-----------------------------------------------

Thanks for the reply [~sewen] 

I agree we should add a {{taskmanager.checkpoints.max-concurrent}} for saftety 
net, but this may be tricky if maxConcurrentCheckpoints > 1, for example, we 
have maxConcurrentCheckpoints = 2, and we have checkpoint 1 and checkpoint 2 
running now, checkpoint 2 was declined by some reason, we’ll trigger checkpoint 
3, and in some TM the snapshots of checkpoint 1 and checkpoint 2 are still 
running, now if we want to cancel the earliest checkpoint, we may choose 
checkpoint 1(but checkpoint 1 may success in the future).

So I propose to add this configure {{taskmanager.checkpoints.max-concurrent}}, 
and the default value for maxConcurrentCheckpoints=1 is 1 and unlimited for 
maxConcurrentCheckpoints > 1.
 * If maxConcurrentCheckpoints = 1, the default 
{{taskmanager.checkpoints.max-concurrent}} is 1.
 * If maxConcurrentCheckpoints > 1 the default value for 
{{taskmanager.checkpoints.max-concurrent}}, is unlimited  

Yes, as you described, if the cancellation messages do not cancel quick enough, 
there may no checkpoint complete anymore.

As described above, I'll create an issue to add additional 
taskmanager.checkpoints.max-concurrent value, what do you think about this, sir?

 

Aside from this, I reported another issue(FLINK-13861) which will cause JM 
never trigger checkpoint any more if cancel an expired checkpoint throws an 
Exception.

> Checkpoints expired by timeout may leak RocksDB files
> -----------------------------------------------------
>
>                 Key: FLINK-13808
>                 URL: https://issues.apache.org/jira/browse/FLINK-13808
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.8.0, 1.8.1
>         Environment: So far only reliably reproducible on a 4-node cluster 
> with parallelism ≥ 100. But do try 
> https://github.com/jcaesar/flink-rocksdb-file-leak
>            Reporter: Julius Michaelis
>            Priority: Minor
>
> A RocksDB state backend with HDFS checkpoints, with or without local 
> recovery, may leak files in {{io.tmp.dirs}} on checkpoint expiry by timeout.
> If the size of a checkpoint crosses what can be transferred during one 
> checkpoint timeout, checkpoints will continue to fail forever. If this is 
> combined with a quick rollover of SST files (e.g. due to a high density of 
> writes), this may quickly exhaust available disk space (or memory, as /tmp is 
> the default location).
> As a workaround, the jobmanager's REST API can be frequently queried for 
> failed checkpoints, and associated files deleted accordingly.
> I've tried investing the cause a little bit, but I'm stuck:
>  * {{Checkpoint 19 of job ac7efce3457d9d73b0a4f775a6ef46f8 expired before 
> completing.}} and similar gets printed, so
>  * [{{abortExpired}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L547-L549],
>  so
>  * [{{dispose}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L416],
>  so
>  * [{{cancelCaller}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L488],
>  so
>  * [the canceler is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L497]
>  ([through one more 
> layer|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]),
>  so
>  * [{{cleanup}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L95],
>  (possibly [not from 
> {{cancel}}|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L84]),
>  so
>  * [{{cleanupProvidedResources}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L162]
>  (this is the indirection that made me give up), so
>  * [this trace 
> log|https://github.com/apache/flink/blob/release-1.8.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L372]
>  should be printed, but it isn't.
> I have some time to further investigate, but I'd appreciate help on finding 
> out where in this chain things go wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to