pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1366667562


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -71,10 +81,19 @@ public void cleanCheckpoint(
             boolean shouldDiscard,
             Runnable postCleanAction,
             Executor executor) {
-        Checkpoint.DiscardObject discardObject =
-                shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
-
-        cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+        LOG.debug(
+                "Clean checkpoint {} parallel-mode={} shouldDiscard={}",
+                checkpoint.getCheckpointID(),
+                parallelMode,
+                shouldDiscard);
+        if (parallelMode) {
+            cleanupParallel(checkpoint, shouldDiscard, postCleanAction, 
executor);
+        } else {
+            Checkpoint.DiscardObject discardObject =
+                    shouldDiscard ? checkpoint.markAsDiscarded() : 
Checkpoint.NOOP_DISCARD_OBJECT;
+
+            cleanup(checkpoint, discardObject::discard, postCleanAction, 
executor);

Review Comment:
   nit: maybe remove this method, and replace it with a call:
   ```
   cleanupParallel(checkpoint, shouldDiscard, postCleanAction, 
Executors#newDirectExecutorService);
   ```
   ? Wouldn't this behave identically?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4303,4 +4310,40 @@ private void ackCheckpoint(
                                                 .build()))),
                 "test");
     }
+
+    static class OperatorSubtaskStateMock {
+        OperatorSubtaskState subtaskState;
+        OperatorStateHandle managedOpHandle;
+        OperatorStateHandle rawOpHandle;
+
+        OperatorSubtaskStateMock() throws Exception {
+            this.managedOpHandle = mock(OperatorStreamStateHandle.class);
+            this.rawOpHandle = mock(OperatorStreamStateHandle.class);
+            OperatorSubtaskState subtaskState =
+                    OperatorSubtaskState.builder()
+                            .setManagedOperatorState(managedOpHandle)
+                            .setRawOperatorState(rawOpHandle)
+                            .build();
+            this.subtaskState = spy(subtaskState);
+        }
+
+        public OperatorSubtaskState getSubtaskState() {
+            return this.subtaskState;
+        }
+
+        public void reset() {
+            Mockito.reset(managedOpHandle);
+            Mockito.reset(rawOpHandle);
+        }
+
+        public void verifyDiscard() throws Exception {
+            verify(managedOpHandle, times(1)).discardState();
+            verify(rawOpHandle, times(1)).discardState();
+        }
+
+        public void verifyNotDiscard() throws Exception {
+            verify(managedOpHandle, never()).discardState();
+            verify(rawOpHandle, never()).discardState();
+        }
+    }

Review Comment:
   Can you rewrite this class without using `Mockito` (as per our [code 
style](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations))?
 I would guess you can replace
   
   ```
   this.managedOpHandle = mock(OperatorStreamStateHandle.class);
   this.rawOpHandle = mock(OperatorStreamStateHandle.class);
   ```
   with proper mocks implementing 
`org.apache.flink.runtime.state.OperatorStateHandle` interface that internally 
are checking if `discardState()` has been called or not (boolean flag) and 
exposing via `boolean isDiscarded()` method or something similar.



##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -109,6 +109,19 @@ public class CheckpointingOptions {
                     .defaultValue(1)
                     .withDescription("The maximum number of completed 
checkpoints to retain.");
 
+    /* Option whether to clean individual checkpoint's operatorstates in 
parallel. If enabled,
+     * operator states are discarded in parallel using the ExecutorService 
passed to the cleaner.
+     * This speeds up checkpoints cleaning, but adds load to the IO.
+     */
+    @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> CLEANER_PARALLEL_MODE =
+            ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Option whether to discard a checkpoint's states 
in parallel using"
+                                    + " the ExecutorService passed into the 
cleaner");

Review Comment:
   It's probably best if you ranomize this config value for tests in:
   ```
   org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration
   ```
   similarly as 
`org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ENABLE_UNALIGNED`
 is randomized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to