rmetzger commented on a change in pull request #15159:
URL: https://github.com/apache/flink/pull/15159#discussion_r598722325
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -178,12 +344,24 @@ void setExpectCreatingExecutionGraph() {
creatingExecutionGraphStateValidator.expectInput(none -> {});
}
- void runScheduledTasks() {
- for (ScheduledTask<Void> scheduledTask : scheduledTasks) {
- scheduledTask.execute();
+ void runScheduledTasks(long untilDelay) {
+ Iterable<ScheduledTask<Void>> copyOfScheduledTasks = new
ArrayList<>(scheduledTasks);
+ Iterator<ScheduledTask<Void>> scheduledTaskIterator =
copyOfScheduledTasks.iterator();
+ while (scheduledTaskIterator.hasNext()) {
+ ScheduledTask<Void> scheduledTask =
scheduledTaskIterator.next();
+ if (scheduledTask.getDelay(TimeUnit.MILLISECONDS) <=
untilDelay) {
+ scheduledTask.execute();
+ if (!scheduledTask.isPeriodic()) {
+ scheduledTaskIterator.remove();
+ }
+ }
Review comment:
But they will be executed with the next call to runScheduledTasks(). I
want to only execute tasks which are scheduled up to now.
Immediately executing newly scheduled tasks can lead to loops, for example
WaitingForResources.checkDesiredOrSufficientResourcesAvailable() schedules
itself in some circumstances.
But there was actually a flaw in my code: The removal mechanism doesn't work
as intended: It removes elements from the copy. My intention was actually to
execute non periodic tasks only once. I've addressed this now in the code.
If you disagree with any of these changes, I'm happy to address this in a
hotfix alongside another pull request or a separate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]