This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9699cf4350b6675ece6516cf690fe237b3a1e3f8 Author: David Moravek <[email protected]> AuthorDate: Mon Nov 6 14:00:57 2023 +0100 [hotfix] Speed up AdaptiveSchedulerTest by setting scalingIntervalMin to 1ms. --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 49 +++++++++------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 641094ab100..e854cd7d571 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -525,9 +525,8 @@ public class AdaptiveSchedulerTest { Time.minutes(10), Time.minutes(10)); - final Configuration configuration = new Configuration(); + final Configuration configuration = createConfigurationWithNoTimeouts(); configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1); - configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); final AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder( @@ -612,7 +611,7 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final Configuration configuration = new Configuration(); + final Configuration configuration = createConfigurationWithNoTimeouts(); configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1); configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(10L)); configuration.set( @@ -977,16 +976,13 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final Configuration configuration = new Configuration(); - configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); - final AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setDeclarativeSlotPool(declarativeSlotPool) - .setJobMasterConfiguration(configuration) + .setJobMasterConfiguration(createConfigurationWithNoTimeouts()) .build(); final SubmissionBufferingTaskManagerGateway taskManagerGateway = @@ -1049,7 +1045,7 @@ public class AdaptiveSchedulerTest { createDeclarativeSlotPool(jobGraph.getJobID()); final AdaptiveScheduler scheduler = - createSchedulerWithNoResourceWaitTimeout(jobGraph, declarativeSlotPool); + createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); final int scaledUpParallelism = PARALLELISM * 2; @@ -1085,7 +1081,7 @@ public class AdaptiveSchedulerTest { createDeclarativeSlotPool(jobGraph.getJobID()); final AdaptiveScheduler scheduler = - createSchedulerWithNoResourceWaitTimeout(jobGraph, declarativeSlotPool); + createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); final SubmissionBufferingTaskManagerGateway taskManagerGateway = createSubmissionBufferingTaskManagerGateway(PARALLELISM, scheduler); @@ -1112,7 +1108,7 @@ public class AdaptiveSchedulerTest { createDeclarativeSlotPool(jobGraph.getJobID()); final AdaptiveScheduler scheduler = - createSchedulerWithNoResourceWaitTimeout(jobGraph, declarativeSlotPool); + createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); final SubmissionBufferingTaskManagerGateway taskManagerGateway = createSubmissionBufferingTaskManagerGateway(PARALLELISM, scheduler); @@ -1149,7 +1145,7 @@ public class AdaptiveSchedulerTest { createDeclarativeSlotPool(jobGraph.getJobID()); final AdaptiveScheduler scheduler = - createSchedulerWithNoResourceWaitTimeout(jobGraph, declarativeSlotPool); + createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); int scaledUpParallelism = PARALLELISM * 10; final SubmissionBufferingTaskManagerGateway taskManagerGateway = @@ -1219,8 +1215,7 @@ public class AdaptiveSchedulerTest { createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM); final AdaptiveScheduler scheduler = - prepareScheduler(jobGraph, declarativeSlotPool) - .setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout()) + prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool) .setJobResourceRequirements(initialJobResourceRequirements) .build(); @@ -1251,7 +1246,7 @@ public class AdaptiveSchedulerTest { createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM); final AdaptiveScheduler scheduler = - prepareScheduler(jobGraph, declarativeSlotPool) + prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool) .setJobResourceRequirements(initialJobResourceRequirements) .build(); @@ -1278,27 +1273,23 @@ public class AdaptiveSchedulerTest { awaitJobReachingParallelism(taskManagerGateway, scheduler, availableSlots); } - private AdaptiveSchedulerBuilder prepareScheduler( - JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) { - return new AdaptiveSchedulerBuilder( - jobGraph, singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) - .setDeclarativeSlotPool(declarativeSlotPool); - } - - private static Configuration getConfigurationWithNoResourceWaitTimeout() { + private static Configuration createConfigurationWithNoTimeouts() { return new Configuration() - .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); + .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)) + .set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(1L)); } - private AdaptiveSchedulerBuilder prepareSchedulerWithNoResourceWaitTimeout( + private AdaptiveSchedulerBuilder prepareSchedulerWithNoTimeouts( JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) { - return prepareScheduler(jobGraph, declarativeSlotPool) - .setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout()); + return new AdaptiveSchedulerBuilder( + jobGraph, singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(createConfigurationWithNoTimeouts()); } - private AdaptiveScheduler createSchedulerWithNoResourceWaitTimeout( + private AdaptiveScheduler createSchedulerWithNoTimeouts( JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) throws Exception { - return prepareSchedulerWithNoResourceWaitTimeout(jobGraph, declarativeSlotPool).build(); + return prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool).build(); } private SubmissionBufferingTaskManagerGateway createSubmissionBufferingTaskManagerGateway( @@ -1922,7 +1913,7 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final Duration slotIdleTimeout = Duration.ofMillis(10); - final Configuration configuration = new Configuration(); + final Configuration configuration = createConfigurationWithNoTimeouts(); configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, slotIdleTimeout.toMillis()); final DeclarativeSlotPool declarativeSlotPool =
