pnowojski commented on code in PR #23425:
URL: https://github.com/apache/flink/pull/23425#discussion_r1366667562
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -71,10 +81,19 @@ public void cleanCheckpoint(
boolean shouldDiscard,
Runnable postCleanAction,
Executor executor) {
- Checkpoint.DiscardObject discardObject =
- shouldDiscard ? checkpoint.markAsDiscarded() :
Checkpoint.NOOP_DISCARD_OBJECT;
-
- cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
+ LOG.debug(
+ "Clean checkpoint {} parallel-mode={} shouldDiscard={}",
+ checkpoint.getCheckpointID(),
+ parallelMode,
+ shouldDiscard);
+ if (parallelMode) {
+ cleanupParallel(checkpoint, shouldDiscard, postCleanAction,
executor);
+ } else {
+ Checkpoint.DiscardObject discardObject =
+ shouldDiscard ? checkpoint.markAsDiscarded() :
Checkpoint.NOOP_DISCARD_OBJECT;
+
+ cleanup(checkpoint, discardObject::discard, postCleanAction,
executor);
Review Comment:
nit: maybe remove this method, and replace it with a call:
```
cleanupParallel(checkpoint, shouldDiscard, postCleanAction,
Executors#newDirectExecutorService);
```
? Wouldn't this behave identically?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4303,4 +4310,40 @@ private void ackCheckpoint(
.build()))),
"test");
}
+
+ static class OperatorSubtaskStateMock {
+ OperatorSubtaskState subtaskState;
+ OperatorStateHandle managedOpHandle;
+ OperatorStateHandle rawOpHandle;
+
+ OperatorSubtaskStateMock() throws Exception {
+ this.managedOpHandle = mock(OperatorStreamStateHandle.class);
+ this.rawOpHandle = mock(OperatorStreamStateHandle.class);
+ OperatorSubtaskState subtaskState =
+ OperatorSubtaskState.builder()
+ .setManagedOperatorState(managedOpHandle)
+ .setRawOperatorState(rawOpHandle)
+ .build();
+ this.subtaskState = spy(subtaskState);
+ }
+
+ public OperatorSubtaskState getSubtaskState() {
+ return this.subtaskState;
+ }
+
+ public void reset() {
+ Mockito.reset(managedOpHandle);
+ Mockito.reset(rawOpHandle);
+ }
+
+ public void verifyDiscard() throws Exception {
+ verify(managedOpHandle, times(1)).discardState();
+ verify(rawOpHandle, times(1)).discardState();
+ }
+
+ public void verifyNotDiscard() throws Exception {
+ verify(managedOpHandle, never()).discardState();
+ verify(rawOpHandle, never()).discardState();
+ }
+ }
Review Comment:
Can you rewrite this class without using `Mockito` (as per our [code
style](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations))?
I would guess you can replace
```
this.managedOpHandle = mock(OperatorStreamStateHandle.class);
this.rawOpHandle = mock(OperatorStreamStateHandle.class);
```
with proper mocks implementing
`org.apache.flink.runtime.state.OperatorStateHandle` interface that internally
are checking if `discardState()` has been called or not (boolean flag) and
exposing via `boolean isDiscarded()` method or something similar.
##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -109,6 +109,19 @@ public class CheckpointingOptions {
.defaultValue(1)
.withDescription("The maximum number of completed
checkpoints to retain.");
+ /* Option whether to clean individual checkpoint's operatorstates in
parallel. If enabled,
+ * operator states are discarded in parallel using the ExecutorService
passed to the cleaner.
+ * This speeds up checkpoints cleaning, but adds load to the IO.
+ */
+ @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
+ public static final ConfigOption<Boolean> CLEANER_PARALLEL_MODE =
+ ConfigOptions.key("state.checkpoint.cleaner.parallel-mode")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Option whether to discard a checkpoint's states
in parallel using"
+ + " the ExecutorService passed into the
cleaner");
Review Comment:
It's probably best if you ranomize this config value for tests in:
```
org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration
```
similarly as
`org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ENABLE_UNALIGNED`
is randomized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]