rkhachatryan commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r491905895



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.StateUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Delegate class responsible for checkpoints cleaning and counting the number 
of checkpoints yet
+ * to clean.
+ */
+public class CheckpointsCleaner implements Serializable{
+       private final AtomicInteger numberOfCheckpointsToClean;
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointsCleaner.class);
+
+       public CheckpointsCleaner() {
+               this.numberOfCheckpointsToClean = new AtomicInteger(0);
+       }
+
+       int getNumberOfCheckpointsToClean() {
+               return numberOfCheckpointsToClean.get();
+       }
+
+       public void cleanCheckpoint(Runnable cleanAction, Runnable 
postCleanAction, Executor executor) {
+               numberOfCheckpointsToClean.incrementAndGet();
+               executor.execute(() -> {
+                       try {
+                               cleanAction.run();
+                       } finally {
+                               numberOfCheckpointsToClean.decrementAndGet();
+                               postCleanAction.run();
+                       }
+               });
+       }
+
+       public void cleanStates(Runnable postCleanAction, Map<OperatorID, 
OperatorState> operatorStates, PendingCheckpoint pendingCheckpoint, 
CheckpointStorageLocation targetLocation, Executor executor){
+               numberOfCheckpointsToClean.incrementAndGet();
+               executor.execute(() -> {
+                       // discard the private states.
+                       // unregistered shared states are still considered 
private at this point.
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                               targetLocation.disposeOnFailure();

Review comment:
       I think this logic should belong to the `PendingCheckpoint` (as well as 
`LOG.warn` and `map.clear`) as it accesses internals of the `PendingCheckpoint`.
   In the original version, this was the case.
   And if we use that original Runnable, we can even get rid of `cleanStates` 
and have only `cleanCheckpoint`.
   WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();

Review comment:
       I think `ManuallyTriggeredScheduledExecutor` would suffice here.
   Did you consider using it?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();
+               ZooKeeperCompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+
+               //pause the executor to pause checkpoints cleaning, to allow 
assertions
+               executor.pause();
+
+               int nbCheckpointsToInject = 3;
+               for (int i = 1; i <= nbCheckpointsToInject; i++) {
+                       // add checkpoints to clean
+                       TestCompletedCheckpoint completedCheckpoint = new 
TestCompletedCheckpoint(new JobID(), i,
+                               i, Collections.emptyMap(), 
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                               checkpointsCleaner::cleanCheckpoint);
+                       checkpointStore.addCheckpoint(completedCheckpoint);
+               }
+
+               Thread.sleep(100L); // give time to submit checkpoints for 
cleaning
+
+               int nbCheckpointsSubmittedForCleaningByCheckpointStore = 
nbCheckpointsToInject - maxCheckpointsToRetain;
+               
assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, 
checkpointsCleaner.getNumberOfCheckpointsToClean());

Review comment:
       This `Thread.sleep` can be fragile. How about replacing it with a loop:
   ```
   while (checkpointsCleaner.getNumberOfCheckpointsToClean() < 
nbCheckpointsToInject - maxCheckpointsToRetain) {
       Thread.sleep(10);
   }
   ```
   ?
   
   (ditto: the next `sleep`)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -108,6 +110,11 @@
        @Nullable
        private transient volatile CompletedCheckpointStats.DiscardCallback 
discardCallback;
 
+       private final CheckpointsCleaningRunner cleanCallback;
+
+       private final SerializableRunnable checkpointCleaningFinishedCallback;

Review comment:
       Can we use uniform naming here, like `cleanupCallback` and 
`cleanupFinishedCallback`?
   (ditto: `PendingCheckpoint`)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -210,6 +221,21 @@ public void 
registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg
        //  Discard and Dispose
        // 
------------------------------------------------------------------------
 
+       /**
+        * Asynchronously call a discard on the ioExecutor
+        * (FixedThreadPool of configurable size of default 4*CPU cores)
+        * and count the number of checkpoints that are waiting to clean.
+        */
+       void 
asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, 
Exception> discardCallback, Executor executor){
+               cleanCallback.accept(() -> {
+                       try {
+                               discardCallback.accept(this);

Review comment:
       This callback calling callback and `callback.accept(this)` doesn't look 
very readable to me. 
   
   Looking at `discardCallback`, I see:
   1. it's a call to `CompletedCheckpoint.doDiscard` if some condition is `true`
   1. the condition can be evaluated early
   
   Therefore, we can get rid of it by:
   1. evaluating condition ASAP in `ZooKeeperCompletedCheckpointStore` (maybe 
extract methods `CompletedCheckpoint.shouldDiscardOnShutdown` and `OnSubsume`)
   1. passing `boolean discard` to `tryRemoveCompletedCheckpoint` and further 
instead of callback
   1. replacing `discardCallback.accept(this);`
   with `if (discard) this.doDiscard();`. 
   in `CompletedCheckpoint.asyncDiscardCheckpointAndCountCheckpoint`:
   
   WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();
+               ZooKeeperCompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+
+               //pause the executor to pause checkpoints cleaning, to allow 
assertions
+               executor.pause();
+
+               int nbCheckpointsToInject = 3;
+               for (int i = 1; i <= nbCheckpointsToInject; i++) {
+                       // add checkpoints to clean
+                       TestCompletedCheckpoint completedCheckpoint = new 
TestCompletedCheckpoint(new JobID(), i,
+                               i, Collections.emptyMap(), 
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                               checkpointsCleaner::cleanCheckpoint);
+                       checkpointStore.addCheckpoint(completedCheckpoint);
+               }
+
+               Thread.sleep(100L); // give time to submit checkpoints for 
cleaning
+
+               int nbCheckpointsSubmittedForCleaningByCheckpointStore = 
nbCheckpointsToInject - maxCheckpointsToRetain;
+               
assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, 
checkpointsCleaner.getNumberOfCheckpointsToClean());

Review comment:
       An infinite loop will still fail the test with a timeout (this is not 
ideal, but I don't see any better way to do this).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -210,6 +221,21 @@ public void 
registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg
        //  Discard and Dispose
        // 
------------------------------------------------------------------------
 
+       /**
+        * Asynchronously call a discard on the ioExecutor
+        * (FixedThreadPool of configurable size of default 4*CPU cores)
+        * and count the number of checkpoints that are waiting to clean.
+        */
+       void 
asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, 
Exception> discardCallback, Executor executor){
+               cleanCallback.accept(() -> {
+                       try {
+                               discardCallback.accept(this);

Review comment:
       Sure, we shouldn't break the encapsulation of `CompletedCheckpoint`, but 
we could extract these conditions into public methods (`boolean 
shouldDiscardOnXxx()`) which would be used by ZooKeeperStore.
   If you think it doesn't worth it I'm also fine with the current approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to