tillrohrmann commented on a change in pull request #15159:
URL: https://github.com/apache/flink/pull/15159#discussion_r596773120



##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -14,11 +14,17 @@
             <td>Integer</td>
             <td>Configure the minimum increase in parallelism for a job to 
scale up.</td>
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
+            <td style="word-wrap: break-word;">10 s</td>
+            <td>Duration</td>
+            <td>The resource stabilization timeout defines the time the 
JobManager will wait if fewer than the required resources are available, but 
sufficient resources for execution are there. Once this timeout has passed, the 
job will start executing with the available resources, or fail, if the 
resources are not sufficient. The timeout starts as soon as there are 
sufficient resources available for execution.<br />If <span 
markdown="span">`scheduler-mode`</span> is configured to <span 
markdown="span">`REACTIVE`</span>, this configuration value will default to 0, 
so that jobs are starting immediately with the available resources.</td>
+        </tr>
         <tr>
             
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
             <td>Duration</td>
-            <td>The maximum time the JobManager will wait to acquire all 
required resources after a job submission or restart. Once elapsed it will try 
to run the job with a lower parallelism, or fail if the minimum amount of 
resources could not be acquired.<br />Increasing this value will make the 
cluster more resilient against temporary resources shortages (e.g., there is 
more time for a failed TaskManager to be restarted), while decreasing this 
value reduces downtime of a job (provided that enough slots are available to 
still run the job).</td>
+            <td>The maximum time the JobManager will wait to acquire all 
required resources after a job submission or restart. Once elapsed it will try 
to run the job with a lower parallelism, or fail if the minimum amount of 
resources could not be acquired.<br />Increasing this value will make the 
cluster more resilient against temporary resources shortages (e.g., there is 
more time for a failed TaskManager to be restarted), while decreasing this 
value reduces downtime of a job (provided that enough slots are available to 
still run the job).<br />Setting a negative duration will disable the resource 
timeout: The JobManager will wait indefinitely for resources to appear.<br />If 
<span markdown="span">`scheduler-mode`</span> is configured to <span 
markdown="span">`REACTIVE`</span>, this configuration value will default to a 
negative value to disable the resource timeout.</td>

Review comment:
       Is it still correct to say that decreasing this value will reduce the 
downtime of jobs? If we have sufficient resources, then 
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` should be the 
decisive factor how long the system waits to restart the job.

##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -14,11 +14,17 @@
             <td>Integer</td>
             <td>Configure the minimum increase in parallelism for a job to 
scale up.</td>
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
+            <td style="word-wrap: break-word;">10 s</td>
+            <td>Duration</td>
+            <td>The resource stabilization timeout defines the time the 
JobManager will wait if fewer than the required resources are available, but 
sufficient resources for execution are there. Once this timeout has passed, the 
job will start executing with the available resources, or fail, if the 
resources are not sufficient. The timeout starts as soon as there are 
sufficient resources available for execution.<br />If <span 
markdown="span">`scheduler-mode`</span> is configured to <span 
markdown="span">`REACTIVE`</span>, this configuration value will default to 0, 
so that jobs are starting immediately with the available resources.</td>

Review comment:
       Maybe: `The resource stabilization timeout defines the time the 
JobManager will wait if fewer than the desired but sufficient resources are 
available.`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
##########
@@ -90,13 +107,40 @@ public Logger getLogger() {
 
     @Override
     public void notifyNewResourcesAvailable() {
-        if (context.hasEnoughResources(desiredResources)) {
+        checkDesiredOrSufficientResourcesAvailable();
+    }
+
+    private void checkDesiredOrSufficientResourcesAvailable() {
+        if (context.hasDesiredResources(desiredResources)) {
             createExecutionGraphWithAvailableResources();
+            return;
         }
+
+        if (context.hasSufficientResources()) {
+            Deadline deadline = initializeOrGetResourceStabilizationDeadline();
+            if (deadline.isOverdue()) {
+                createExecutionGraphWithAvailableResources();
+            } else {
+                // schedule next resource check
+                context.runIfState(
+                        this,
+                        this::checkDesiredOrSufficientResourcesAvailable,
+                        deadline.timeLeft());
+            }
+        }

Review comment:
       Shouldn't we clear the resource stabilization deadline if we no longer 
have sufficient resources so that we can start a new deadline once we have 
sufficient resources again?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
     @Test
     public void testNotifyNewResourcesAvailable() throws Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
-            ctx.setHasEnoughResources(() -> true); // make resources available
+                    new WaitingForResources(
+                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+            ctx.setHasDesiredResources(() -> true); // make resources available
             ctx.setExpectExecuting(assertNonNull());
             wfr.notifyNewResourcesAvailable(); // .. and notify
         }
     }
 
     @Test
-    public void testResourceTimeout() throws Exception {
+    public void 
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration noStabilizationTimeout = Duration.ofMillis(0);
+            WaitingForResources wfr =
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            noStabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false);
+
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            stabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.notifyNewResourcesAvailable();
+            // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
+
+            assertThat(ctx.hasStateTransition(), is(false));
+        }
+    }
+
+    @Test
+    public void testSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration initialResourceTimeout = Duration.ofMillis(120948);
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+            TestingWaitingForResources wfr =
+                    new TestingWaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            initialResourceTimeout,
+                            stabilizationTimeout);
+
+            // not enough resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> false);
+
+            assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+            // sufficient resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
 
             ctx.setExpectExecuting(assertNonNull());
+            // start countdown
+            wfr.notifyNewResourcesAvailable();
+
+            assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+            // advance time to now
+            wfr.setTestDeadline(Deadline.now());
 
-            // immediately execute all scheduled runnables
-            assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+            // execute all runnables and trigger expected state transition
             for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
-                if (scheduledRunnable.getExpectedState() == wfr) {
+                if (scheduledRunnable.getExpectedState() == wfr
+                        && 
!scheduledRunnable.getDelay().equals(initialResourceTimeout)
+                        && 
!scheduledRunnable.getDelay().equals(Duration.ZERO)) {
                     scheduledRunnable.runAction();
                 }
             }
         }
     }
 
+    private static void assertNoStateTransitionsAfterExecutingRunnables(
+            MockContext ctx, WaitingForResources wfr) {
+        Iterator<ScheduledRunnable> runnableIterator = 
ctx.getScheduledRunnables().iterator();
+        while (runnableIterator.hasNext()) {
+            ScheduledRunnable scheduledRunnable = runnableIterator.next();
+            if (scheduledRunnable.getExpectedState() == wfr
+                    && scheduledRunnable.getDeadline().isOverdue()) {
+                scheduledRunnable.runAction();
+                runnableIterator.remove();
+            }
+        }
+        assertThat(ctx.hasStateTransition, is(false));
+    }
+
+    private static class TestingWaitingForResources extends 
WaitingForResources {
+
+        private Deadline testDeadline;
+
+        TestingWaitingForResources(
+                Context context,
+                Logger log,
+                ResourceCounter desiredResources,
+                Duration initialResourceAllocationTimeout,
+                Duration resourceStabilizationTimeout) {
+            super(
+                    context,
+                    log,
+                    desiredResources,
+                    initialResourceAllocationTimeout,
+                    resourceStabilizationTimeout);
+        }
+
+        @Override
+        protected Deadline initializeOrGetResourceStabilizationDeadline() {
+            return testDeadline;
+        }
+
+        public void setTestDeadline(Deadline testDeadline) {
+            this.testDeadline = testDeadline;
+        }
+    }
+
+    @Test
+    public void testNoStateTransitionOnNoResourceTimeout() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasDesiredResources(() -> false);
+            WaitingForResources wfr =
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofMillis(-1),
+                            STABILIZATION_TIMEOUT);
+
+            executeAllScheduledRunnables(ctx, wfr);
+
+            assertThat(ctx.hasStateTransition(), is(false));
+        }
+    }
+
+    @Test
+    public void testStateTransitionOnResourceTimeout() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasDesiredResources(() -> false);
+            WaitingForResources wfr =
+                    new WaitingForResources(
+                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+
+            ctx.setExpectExecuting(assertNonNull());
+
+            executeAllScheduledRunnables(ctx, wfr);
+        }
+    }
+

Review comment:
       I think we are lacking a test where we have sufficient resources, then 
again no sufficient resources and then restart the timeout once we have 
sufficient resources again (e.g. testing that the sufficient resource timeout 
can be restarted).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
##########
@@ -90,13 +107,40 @@ public Logger getLogger() {
 
     @Override
     public void notifyNewResourcesAvailable() {
-        if (context.hasEnoughResources(desiredResources)) {
+        checkDesiredOrSufficientResourcesAvailable();
+    }
+
+    private void checkDesiredOrSufficientResourcesAvailable() {
+        if (context.hasDesiredResources(desiredResources)) {
             createExecutionGraphWithAvailableResources();
+            return;
         }
+
+        if (context.hasSufficientResources()) {

Review comment:
       One minor idea could be to unify `hasDesiredResources` and 
`hasSufficientResources`. I think both methods do effectively the same and 
could signal the different results with a more expressive return type.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -411,6 +411,35 @@
                                     .text(
                                             "Increasing this value will make 
the cluster more resilient against temporary resources shortages (e.g., there 
is more time for a failed TaskManager to be restarted), "
                                                     + "while decreasing this 
value reduces downtime of a job (provided that enough slots are available to 
still run the job).")
+                                    .linebreak()
+                                    .text(
+                                            "Setting a negative duration will 
disable the resource timeout: The JobManager will wait indefinitely for 
resources to appear.")
+                                    .linebreak()
+                                    .text(
+                                            "If %s is configured to %s, this 
configuration value will default to a negative value to disable the resource 
timeout.",
+                                            code(SCHEDULER_MODE.key()),
+                                            
code(SchedulerExecutionMode.REACTIVE.name()))
+                                    .build());
+
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT =
+            key("jobmanager.adaptive-scheduler.resource-stabilization-timeout")
+                    .durationType()
+                    .defaultValue(RESOURCE_WAIT_TIMEOUT.defaultValue())
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The resource stabilization 
timeout defines the time the JobManager will wait "

Review comment:
       I like Chesnay's proposal.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
##########
@@ -193,4 +197,37 @@ public static DefaultSlotPoolServiceSchedulerFactory 
fromConfiguration(
         return new DefaultSlotPoolServiceSchedulerFactory(
                 slotPoolServiceFactory, schedulerNGFactory);
     }
+
+    private static AdaptiveSchedulerFactory 
getAdaptiveSchedulerFactoryFromConfiguration(
+            Configuration configuration) {
+        Duration initialResourceAllocationTimeout =
+                returnValueOrReplaceDefaultIfReactiveMode(
+                        configuration,
+                        JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
+                        Duration.ofMillis(-1));
+        Duration resourceStabilizationTimeout =
+                returnValueOrReplaceDefaultIfReactiveMode(
+                        configuration,
+                        JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT,
+                        Duration.ZERO);

Review comment:
       Would it be simpler to define the defaults based on 
`configuration.get(JobManagerOptions.SCHEDULER_MODE)` here and then simply call 
`configuration.getOptional(option).orElseGet(() -> defaultValue)` where 
`defaultValue` has been specified before?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
     @Test
     public void testNotifyNewResourcesAvailable() throws Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
-            ctx.setHasEnoughResources(() -> true); // make resources available
+                    new WaitingForResources(
+                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+            ctx.setHasDesiredResources(() -> true); // make resources available
             ctx.setExpectExecuting(assertNonNull());
             wfr.notifyNewResourcesAvailable(); // .. and notify
         }
     }
 
     @Test
-    public void testResourceTimeout() throws Exception {
+    public void 
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration noStabilizationTimeout = Duration.ofMillis(0);
+            WaitingForResources wfr =
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            noStabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false);
+
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            stabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.notifyNewResourcesAvailable();
+            // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
+
+            assertThat(ctx.hasStateTransition(), is(false));
+        }
+    }
+
+    @Test
+    public void testSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration initialResourceTimeout = Duration.ofMillis(120948);
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+            TestingWaitingForResources wfr =
+                    new TestingWaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            initialResourceTimeout,
+                            stabilizationTimeout);
+
+            // not enough resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> false);
+
+            assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+            // sufficient resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));

Review comment:
       This leaks quite some implementation details. If we need to go to these 
lengths then this is usually an indicator that the class is not well testable.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
     @Test
     public void testNotifyNewResourcesAvailable() throws Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
-            ctx.setHasEnoughResources(() -> true); // make resources available
+                    new WaitingForResources(
+                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+            ctx.setHasDesiredResources(() -> true); // make resources available
             ctx.setExpectExecuting(assertNonNull());
             wfr.notifyNewResourcesAvailable(); // .. and notify
         }
     }
 
     @Test
-    public void testResourceTimeout() throws Exception {
+    public void 
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration noStabilizationTimeout = Duration.ofMillis(0);
+            WaitingForResources wfr =
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            noStabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false);
+
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            stabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.notifyNewResourcesAvailable();
+            // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
+
+            assertThat(ctx.hasStateTransition(), is(false));
+        }
+    }
+
+    @Test
+    public void testSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration initialResourceTimeout = Duration.ofMillis(120948);
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+            TestingWaitingForResources wfr =
+                    new TestingWaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            initialResourceTimeout,
+                            stabilizationTimeout);
+
+            // not enough resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> false);
+
+            assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+            // sufficient resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
 
             ctx.setExpectExecuting(assertNonNull());
+            // start countdown
+            wfr.notifyNewResourcesAvailable();
+
+            assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+            // advance time to now
+            wfr.setTestDeadline(Deadline.now());

Review comment:
       For these timeout tests, it might make sense to introduce a `Clock` for 
the global test time. This `Clock` could be used by the state as well as the 
test context. Then we could say that we can advance the clock which would 
automatically run all runnables which are eligible now. That way we might be 
able to express the tests a bit more naturally.

##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -14,11 +14,17 @@
             <td>Integer</td>
             <td>Configure the minimum increase in parallelism for a job to 
scale up.</td>
         </tr>
+        <tr>
+            
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
+            <td style="word-wrap: break-word;">10 s</td>
+            <td>Duration</td>
+            <td>The resource stabilization timeout defines the time the 
JobManager will wait if fewer than the required resources are available, but 
sufficient resources for execution are there. Once this timeout has passed, the 
job will start executing with the available resources, or fail, if the 
resources are not sufficient. The timeout starts as soon as there are 
sufficient resources available for execution.<br />If <span 
markdown="span">`scheduler-mode`</span> is configured to <span 
markdown="span">`REACTIVE`</span>, this configuration value will default to 0, 
so that jobs are starting immediately with the available resources.</td>

Review comment:
       Why are we failing if there are not sufficient resources available? I 
thought that this timeout would only be triggered if we have sufficient 
resources?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +117,187 @@ public void testNotEnoughResources() throws Exception {
     @Test
     public void testNotifyNewResourcesAvailable() throws Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
-            ctx.setHasEnoughResources(() -> true); // make resources available
+                    new WaitingForResources(
+                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+            ctx.setHasDesiredResources(() -> true); // make resources available
             ctx.setExpectExecuting(assertNonNull());
             wfr.notifyNewResourcesAvailable(); // .. and notify
         }
     }
 
     @Test
-    public void testResourceTimeout() throws Exception {
+    public void 
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration noStabilizationTimeout = Duration.ofMillis(0);
+            WaitingForResources wfr =
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            noStabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false);
+
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            stabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.notifyNewResourcesAvailable();
+            // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
+
+            assertThat(ctx.hasStateTransition(), is(false));
+        }
+    }
+
+    @Test
+    public void testSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration initialResourceTimeout = Duration.ofMillis(120948);
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+            TestingWaitingForResources wfr =
+                    new TestingWaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            initialResourceTimeout,
+                            stabilizationTimeout);
+
+            // not enough resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> false);
+
+            assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+            // sufficient resources available
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
 
             ctx.setExpectExecuting(assertNonNull());
+            // start countdown
+            wfr.notifyNewResourcesAvailable();
+
+            assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);
+
+            // advance time to now
+            wfr.setTestDeadline(Deadline.now());
 
-            // immediately execute all scheduled runnables
-            assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+            // execute all runnables and trigger expected state transition
             for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
-                if (scheduledRunnable.getExpectedState() == wfr) {
+                if (scheduledRunnable.getExpectedState() == wfr
+                        && 
!scheduledRunnable.getDelay().equals(initialResourceTimeout)
+                        && 
!scheduledRunnable.getDelay().equals(Duration.ZERO)) {

Review comment:
       Why do we need these filters here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to