tillrohrmann commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r496721356



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();
+               ZooKeeperCompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+
+               //pause the executor to pause checkpoints cleaning, to allow 
assertions
+               executor.pause();
+
+               int nbCheckpointsToInject = 3;
+               for (int i = 1; i <= nbCheckpointsToInject; i++) {
+                       // add checkpoints to clean
+                       TestCompletedCheckpoint completedCheckpoint = new 
TestCompletedCheckpoint(new JobID(), i,
+                               i, Collections.emptyMap(), 
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                               checkpointsCleaner::cleanCheckpoint);
+                       checkpointStore.addCheckpoint(completedCheckpoint);
+               }
+
+               Thread.sleep(100L); // give time to submit checkpoints for 
cleaning
+
+               int nbCheckpointsSubmittedForCleaningByCheckpointStore = 
nbCheckpointsToInject - maxCheckpointsToRetain;
+               
assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, 
checkpointsCleaner.getNumberOfCheckpointsToClean());

Review comment:
       I am not entirely sure about which current code snippet we are talking 
because this one seems to point at some outdated version. Hence I can only give 
my general thoughts: I think that Roman has a valid point that relying on 
implementation details such as the number of submitted `Runnables` is more 
brittle than relying on some public API. I myself have built quite some of 
these tests and regretted this shortly after when having to refactor these 
tests. Hence, my recommendation would be to avoid this as much as possible and 
rather try to rely on the public API of the components under test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to