yigress commented on code in PR #23425: URL: https://github.com/apache/flink/pull/23425#discussion_r1362546056
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ########## @@ -71,10 +70,26 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { - Checkpoint.DiscardObject discardObject = - shouldDiscard ? checkpoint.markAsDiscarded() : Checkpoint.NOOP_DISCARD_OBJECT; - - cleanup(checkpoint, discardObject::discard, postCleanAction, executor); + if (shouldDiscard) { + incrementNumberOfCheckpointsToClean(); + checkpoint + .markAsDiscarded() + .discardAsync(executor) + .handle( + (Object outerIgnored, Throwable outerThrowable) -> { + if (outerThrowable != null) { + LOG.warn( + "Could not properly discard completed checkpoint {}.", + checkpoint.getCheckpointID(), + outerThrowable); + } + decrementNumberOfCheckpointsToClean(); Review Comment: for shouldDiscard=false, there is no cleanup action, the original code does incrementNumberOfCheckpointsToClean(); -> { cleanupAction } -> finally { decrementNumberOfCheckpointsToClean(); postCleanupAction }, so it looks like can just run postCleanupAction ########## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ########## @@ -109,6 +109,20 @@ public class CheckpointingOptions { .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); + /** + * Option whether to clean individual checkpoint's operatorstates in parallel. If enabled, + * operator states are discarded in parallel using the ExecutorService passed to the cleaner. + * This speeds up checkpoints cleaning, but adds load to the IO. + */ + @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) + public static final ConfigOption<Boolean> CLEANER_PARALLEL_MODE = + ConfigOptions.key("state.checkpoint.cleaner.parallel-mode") + .booleanType() + .defaultValue(false) Review Comment: no misleading here : ) I thought if default set true, maybe just treat this as an improvement than a feature with a new config. there are places (in shutdown) directly call checkpoint$discardObject#discard without going through CheckpointCleaner, so those have to do it sequentially. for discardAsync do you think it is better to have the optionto do it a slower way and do it a faster way? if so I will add back the config. appreciate your reviews! @pnowojski -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org