yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1362546056


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -71,10 +70,26 @@ public void cleanCheckpoint(
             boolean shouldDiscard,
             Runnable postCleanAction,
             Executor executor) {
-        Checkpoint.DiscardObject discardObject =
-                shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-        cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+        if (shouldDiscard) {
+            incrementNumberOfCheckpointsToClean();
+            checkpoint
+                    .markAsDiscarded()
+                    .discardAsync(executor)
+                    .handle(
+                            (Object outerIgnored, Throwable outerThrowable) -> 
{
+                                if (outerThrowable != null) {
+                                    LOG.warn(
+                                            "Could not properly discard 
completed checkpoint {}.",
+                                            checkpoint.getCheckpointID(),
+                                            outerThrowable);
+                                }
+                                decrementNumberOfCheckpointsToClean();

Review Comment:
   for shouldDiscard=false, there is no cleanup action, the original code does 
   incrementNumberOfCheckpointsToClean(); -> {  cleanupAction  } -> finally { 
decrementNumberOfCheckpointsToClean(); postCleanupAction }, so it looks like 
can just run postCleanupAction



##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
                     .defaultValue(1)
                     .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+    /**
+     * Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+     * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+     * This speeds up checkpoints cleaning, but adds load to the IO.
+     */
+    @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> CLEANER_PARALLEL_MODE =
+            ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+                    .booleanType()
+                    .defaultValue(false)

Review Comment:
   no misleading here : ) I thought if default set true, maybe just treat this 
as an improvement than a feature with a new config.  there are places (in 
shutdown) directly call checkpoint$discardObject#discard without going through 
CheckpointCleaner, so those have to do it sequentially.  for discardAsync do 
you think it is better to have the optionto do it a slower way and do it a 
faster way? if so I will add back the config. appreciate your reviews! 
@pnowojski 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to