This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 9802d240b34 [BP-1.18][FLINK-34274][runtime] Implicitly disable 
resource wait timeout for AdaptiveSchedulerTest (#24399)
9802d240b34 is described below

commit 9802d240b341633f460fe15a89a263f12e540fe4
Author: Matthias Pohl <[email protected]>
AuthorDate: Wed Feb 28 15:23:37 2024 +0100

    [BP-1.18][FLINK-34274][runtime] Implicitly disable resource wait timeout 
for AdaptiveSchedulerTest (#24399)
---
 .../adaptive/AdaptiveSchedulerBuilder.java          |  7 +++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java   | 21 +++++++++------------
 2 files changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 6a7f855141e..c2ddef6417b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -53,6 +53,7 @@ import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
 
 /** Builder for {@link AdaptiveScheduler}. */
 public class AdaptiveSchedulerBuilder {
@@ -118,6 +119,12 @@ public class AdaptiveSchedulerBuilder {
         return this;
     }
 
+    public AdaptiveSchedulerBuilder withConfigurationOverride(
+            Function<Configuration, Configuration> modifyFn) {
+        this.jobMasterConfiguration = modifyFn.apply(jobMasterConfiguration);
+        return this;
+    }
+
     public AdaptiveSchedulerBuilder setUserCodeLoader(final ClassLoader 
userCodeLoader) {
         this.userCodeLoader = userCodeLoader;
         return this;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index be927fe81bf..98074b58d37 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -1218,9 +1218,12 @@ public class AdaptiveSchedulerTest {
         JobResourceRequirements initialJobResourceRequirements =
                 
createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM);
 
+        final Configuration config = getConfigurationWithNoTimeouts();
+        // enable the ResourceWaitTimeout to trigger the immediate failure
+        config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1));
         final AdaptiveScheduler scheduler =
                 prepareScheduler(jobGraph, declarativeSlotPool)
-                        
.setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout())
+                        .setJobMasterConfiguration(config)
                         
.setJobResourceRequirements(initialJobResourceRequirements)
                         .build();
 
@@ -1252,6 +1255,7 @@ public class AdaptiveSchedulerTest {
 
         final AdaptiveScheduler scheduler =
                 prepareScheduler(jobGraph, declarativeSlotPool)
+                        
.setJobMasterConfiguration(getConfigurationWithNoTimeouts())
                         
.setJobResourceRequirements(initialJobResourceRequirements)
                         .build();
 
@@ -1261,14 +1265,6 @@ public class AdaptiveSchedulerTest {
         startJobWithSlotsMatchingParallelism(
                 scheduler, declarativeSlotPool, taskManagerGateway, 
availableSlots);
 
-        // at this point we'd ideally check that the job is stuck in 
WaitingForResources, but we
-        // can't differentiate between waiting due to the minimum requirements 
not being fulfilled
-        // and the resource timeout not being elapsed
-        // We just continue here, as the following tests validate that the 
lower bound can prevent
-        // a job from running:
-        // - 
#testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure()
-        // - 
#testRequirementLowerBoundIncreaseBeyondCurrentParallelismAttemptsImmediateRescale()
-
         // unlock job by decreasing the parallelism
         JobResourceRequirements newJobResourceRequirements =
                 createRequirementsWithLowerAndUpperParallelism(availableSlots, 
PARALLELISM);
@@ -1285,15 +1281,16 @@ public class AdaptiveSchedulerTest {
                 .setDeclarativeSlotPool(declarativeSlotPool);
     }
 
-    private static Configuration getConfigurationWithNoResourceWaitTimeout() {
+    private static Configuration getConfigurationWithNoTimeouts() {
         return new Configuration()
-                .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+                .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(-1L))
+                .set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, 
Duration.ofMillis(1L));
     }
 
     private AdaptiveSchedulerBuilder prepareSchedulerWithNoResourceWaitTimeout(
             JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) {
         return prepareScheduler(jobGraph, declarativeSlotPool)
-                
.setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout());
+                .setJobMasterConfiguration(getConfigurationWithNoTimeouts());
     }
 
     private AdaptiveScheduler createSchedulerWithNoResourceWaitTimeout(

Reply via email to