masteryhx commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1335332614


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -368,6 +373,34 @@ public void discard() throws Exception {
             }
         }
 
+        @Override
+        public void discard() throws Exception {
+            discard(null);
+        }
+
+        private void discardOperatorStates(Executor ioExecutor) throws 
Exception {
+            if (ioExecutor == null) {
+                
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+            } else {
+                List<StateObject> discardables =
+                        operatorStates.values().stream()
+                                .flatMap(op -> op.getDiscardables().stream())
+                                .collect(Collectors.toList());
+                LOG.trace("Executing discard {} operator states {}", 
discardables.size());

Review Comment:
   The parameters of LOG is not matched



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -368,6 +373,34 @@ public void discard() throws Exception {
             }
         }
 
+        @Override
+        public void discard() throws Exception {
+            discard(null);
+        }
+
+        private void discardOperatorStates(Executor ioExecutor) throws 
Exception {

Review Comment:
   The logic is similar with PendingCheckpoint, right ?
   Could you unify and put the logic into one method ?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java:
##########
@@ -72,7 +72,7 @@ class CheckpointCoordinatorFailureTest {
 
     /**
      * Tests that a failure while storing a completed checkpoint in the 
completed checkpoint store
-     * will properly fail the originating pending checkpoint and clean upt the 
completed checkpoint.
+     * will properly fail the originating pending checkpoint and clean up the 
completed checkpoint.

Review Comment:
   This could be seprated into a extra commit, as well as 
`CompletedCheckpointStoreTest`



##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
                     .defaultValue(1)
                     .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+    /**
+     * Option whether to clean each checkpoint's states in fast mode. When in 
fast mode, operator
+     * states are discarded in parallel using the ExecutorService passed to 
the cleaner, otherwise
+     * operator states are discarded sequentially.
+     */
+    @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> CLEANER_FAST_MODE =

Review Comment:
   Could you also clarify the benefits and costs of this option ?



##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -109,6 +109,20 @@ public class CheckpointingOptions {
                     .defaultValue(1)
                     .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+    /**
+     * Option whether to clean each checkpoint's states in fast mode. When in 
fast mode, operator
+     * states are discarded in parallel using the ExecutorService passed to 
the cleaner, otherwise
+     * operator states are discarded sequentially.
+     */
+    @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> CLEANER_FAST_MODE =

Review Comment:
   How about parallel-mode ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to