pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1358178168
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -73,8 +83,12 @@ public void cleanCheckpoint(
Executor executor) {
Checkpoint.DiscardObject discardObject =
shouldDiscard ? checkpoint.markAsDiscarded() :
Checkpoint.NOOP_DISCARD_OBJECT;
-
- cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+ LOG.debug("Clean checkpoint {} fast-mode={}",
checkpoint.getCheckpointID(), parallelMode);
+ if (parallelMode) {
+ cleanup(checkpoint, () -> discardObject.discard(executor),
postCleanAction, executor);
Review Comment:
What's the purpose of introduce this two level async action in the
`ioExecutor`? First level is the loop in `Checkpoint#discardOperatorStates`,
that schedules 2nd level async actions that actually are doing the clean up. It
doesn't sound right. Apart of unnecessarily blocking one io thread, it
introduces opportunities for deadlocks. What if we have N threads and N
simultaneous checkpoints to clean up? It could happen that `ioExecutor` is
fully blocked by the no-op `Checkpoint#discardOperatorStates` actions that can
not complete.
Since you are already adding `getDiscardables()` method, you could create
and schedule discarding of the discardables (in the `ioExecutor`) from the
thread that's calling `CheckpointsCleaner#cleanCheckpoint`. With some
additional `CompletableFuture.allOf(...)` that would perform:
```
decrementNumberOfCheckpointsToClean();
postCleanupAction.run();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]