This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9699cf4350b6675ece6516cf690fe237b3a1e3f8
Author: David Moravek <[email protected]>
AuthorDate: Mon Nov 6 14:00:57 2023 +0100

    [hotfix] Speed up AdaptiveSchedulerTest by setting scalingIntervalMin to 
1ms.
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 49 +++++++++-------------
 1 file changed, 20 insertions(+), 29 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 641094ab100..e854cd7d571 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -525,9 +525,8 @@ public class AdaptiveSchedulerTest {
                         Time.minutes(10),
                         Time.minutes(10));
 
-        final Configuration configuration = new Configuration();
+        final Configuration configuration = 
createConfigurationWithNoTimeouts();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
-        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
 
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(
@@ -612,7 +611,7 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final Configuration configuration = new Configuration();
+        final Configuration configuration = 
createConfigurationWithNoTimeouts();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(10L));
         configuration.set(
@@ -977,16 +976,13 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
-
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .setDeclarativeSlotPool(declarativeSlotPool)
-                        .setJobMasterConfiguration(configuration)
+                        
.setJobMasterConfiguration(createConfigurationWithNoTimeouts())
                         .build();
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
@@ -1049,7 +1045,7 @@ public class AdaptiveSchedulerTest {
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
         final AdaptiveScheduler scheduler =
-                createSchedulerWithNoResourceWaitTimeout(jobGraph, 
declarativeSlotPool);
+                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
 
         final int scaledUpParallelism = PARALLELISM * 2;
 
@@ -1085,7 +1081,7 @@ public class AdaptiveSchedulerTest {
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
         final AdaptiveScheduler scheduler =
-                createSchedulerWithNoResourceWaitTimeout(jobGraph, 
declarativeSlotPool);
+                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 createSubmissionBufferingTaskManagerGateway(PARALLELISM, 
scheduler);
@@ -1112,7 +1108,7 @@ public class AdaptiveSchedulerTest {
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
         final AdaptiveScheduler scheduler =
-                createSchedulerWithNoResourceWaitTimeout(jobGraph, 
declarativeSlotPool);
+                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 createSubmissionBufferingTaskManagerGateway(PARALLELISM, 
scheduler);
@@ -1149,7 +1145,7 @@ public class AdaptiveSchedulerTest {
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
         final AdaptiveScheduler scheduler =
-                createSchedulerWithNoResourceWaitTimeout(jobGraph, 
declarativeSlotPool);
+                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
         int scaledUpParallelism = PARALLELISM * 10;
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
@@ -1219,8 +1215,7 @@ public class AdaptiveSchedulerTest {
                 
createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM);
 
         final AdaptiveScheduler scheduler =
-                prepareScheduler(jobGraph, declarativeSlotPool)
-                        
.setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout())
+                prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool)
                         
.setJobResourceRequirements(initialJobResourceRequirements)
                         .build();
 
@@ -1251,7 +1246,7 @@ public class AdaptiveSchedulerTest {
                 
createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM);
 
         final AdaptiveScheduler scheduler =
-                prepareScheduler(jobGraph, declarativeSlotPool)
+                prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool)
                         
.setJobResourceRequirements(initialJobResourceRequirements)
                         .build();
 
@@ -1278,27 +1273,23 @@ public class AdaptiveSchedulerTest {
         awaitJobReachingParallelism(taskManagerGateway, scheduler, 
availableSlots);
     }
 
-    private AdaptiveSchedulerBuilder prepareScheduler(
-            JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) {
-        return new AdaptiveSchedulerBuilder(
-                        jobGraph, singleThreadMainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                .setDeclarativeSlotPool(declarativeSlotPool);
-    }
-
-    private static Configuration getConfigurationWithNoResourceWaitTimeout() {
+    private static Configuration createConfigurationWithNoTimeouts() {
         return new Configuration()
-                .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+                .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L))
+                .set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, 
Duration.ofMillis(1L));
     }
 
-    private AdaptiveSchedulerBuilder prepareSchedulerWithNoResourceWaitTimeout(
+    private AdaptiveSchedulerBuilder prepareSchedulerWithNoTimeouts(
             JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) {
-        return prepareScheduler(jobGraph, declarativeSlotPool)
-                
.setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout());
+        return new AdaptiveSchedulerBuilder(
+                        jobGraph, singleThreadMainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                .setDeclarativeSlotPool(declarativeSlotPool)
+                
.setJobMasterConfiguration(createConfigurationWithNoTimeouts());
     }
 
-    private AdaptiveScheduler createSchedulerWithNoResourceWaitTimeout(
+    private AdaptiveScheduler createSchedulerWithNoTimeouts(
             JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) throws 
Exception {
-        return prepareSchedulerWithNoResourceWaitTimeout(jobGraph, 
declarativeSlotPool).build();
+        return prepareSchedulerWithNoTimeouts(jobGraph, 
declarativeSlotPool).build();
     }
 
     private SubmissionBufferingTaskManagerGateway 
createSubmissionBufferingTaskManagerGateway(
@@ -1922,7 +1913,7 @@ public class AdaptiveSchedulerTest {
         final JobGraph jobGraph = createJobGraph();
         final Duration slotIdleTimeout = Duration.ofMillis(10);
 
-        final Configuration configuration = new Configuration();
+        final Configuration configuration = 
createConfigurationWithNoTimeouts();
         configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, 
slotIdleTimeout.toMillis());
 
         final DeclarativeSlotPool declarativeSlotPool =

Reply via email to