echauchot commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r496729771
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws
Exception {
recoveredTestCheckpoint.awaitDiscard();
}
+ /**
+ * FLINK-17073 tests that there is no request triggered when there are
too many checkpoints
+ * waiting to clean and that it resumes when the number of waiting
checkpoints as gone below
+ * the threshold.
+ *
+ */
+ @Test
+ public void testChekpointingPausesAndResumeWhenTooManyCheckpoints()
throws Exception{
+ ManualClock clock = new ManualClock();
+ clock.advanceTime(1, TimeUnit.DAYS);
+ int maxCleaningCheckpoints = 1;
+ CheckpointsCleaner checkpointsCleaner = new
CheckpointsCleaner();
+ CheckpointRequestDecider checkpointRequestDecider = new
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+ final int maxCheckpointsToRetain = 1;
+ Executors.PausableThreadPoolExecutor executor =
Executors.pausableExecutor();
+ ZooKeeperCompletedCheckpointStore checkpointStore =
createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+
+ //pause the executor to pause checkpoints cleaning, to allow
assertions
+ executor.pause();
+
+ int nbCheckpointsToInject = 3;
+ for (int i = 1; i <= nbCheckpointsToInject; i++) {
+ // add checkpoints to clean
+ TestCompletedCheckpoint completedCheckpoint = new
TestCompletedCheckpoint(new JobID(), i,
+ i, Collections.emptyMap(),
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+ checkpointsCleaner::cleanCheckpoint);
+ checkpointStore.addCheckpoint(completedCheckpoint);
+ }
+
+ Thread.sleep(100L); // give time to submit checkpoints for
cleaning
+
+ int nbCheckpointsSubmittedForCleaningByCheckpointStore =
nbCheckpointsToInject - maxCheckpointsToRetain;
+
assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore,
checkpointsCleaner.getNumberOfCheckpointsToClean());
Review comment:
Hi @tillrohrmann thanks for your comment !
Regarding the code version, we are referring to the last commit
(https://github.com/apache/flink/pull/13040/commits/aaf4aca0d5ce666001035f6ae66bca841f87c74d).
We agreed on using a waiting loop to sync the executor and the test, the
only point of discussion is on the condition of this waiting loop: I did not
want to use the element that this test tests (the number of cleaning
checkpoints) as a condition for a waiting loop. I agree that using internals
for this condition is not ideal but do you have any condition to suggest for
these waiting loops other than the number of cleaning checkpoints or the number
of runnables submitted ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]