yigress commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1367568908


##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -109,6 +109,19 @@ public class CheckpointingOptions {
                     .defaultValue(1)
                     .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+    /* Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+     * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+     * This speeds up checkpoints cleaning, but adds load to the IO.
+     */
+    @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> CLEANER_PARALLEL_MODE =
+            ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Option whether to discard a checkpoint's states 
in parallel using"
+                                    + " the ExecutorService passed into the 
cleaner");

Review Comment:
   this is nice! add the config into test randomize.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4303,4 +4310,40 @@ private void ackCheckpoint(
                                                 .build()))),
                 "test");
     }
+
+    static class OperatorSubtaskStateMock {
+        OperatorSubtaskState subtaskState;
+        OperatorStateHandle managedOpHandle;
+        OperatorStateHandle rawOpHandle;
+
+        OperatorSubtaskStateMock() throws Exception {
+            this.managedOpHandle = mock(OperatorStreamStateHandle.class);
+            this.rawOpHandle = mock(OperatorStreamStateHandle.class);
+            OperatorSubtaskState subtaskState =
+                    OperatorSubtaskState.builder()
+                            .setManagedOperatorState(managedOpHandle)
+                            .setRawOperatorState(rawOpHandle)
+                            .build();
+            this.subtaskState = spy(subtaskState);
+        }
+
+        public OperatorSubtaskState getSubtaskState() {
+            return this.subtaskState;
+        }
+
+        public void reset() {
+            Mockito.reset(managedOpHandle);
+            Mockito.reset(rawOpHandle);
+        }
+
+        public void verifyDiscard() throws Exception {
+            verify(managedOpHandle, times(1)).discardState();
+            verify(rawOpHandle, times(1)).discardState();
+        }
+
+        public void verifyNotDiscard() throws Exception {
+            verify(managedOpHandle, never()).discardState();
+            verify(rawOpHandle, never()).discardState();
+        }
+    }

Review Comment:
   updated. great instructions on the mock usage (or rather no-mock usage)!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to