gyfora commented on code in PR #22883:
URL: https://github.com/apache/flink/pull/22883#discussion_r1246367717
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -1041,15 +1103,170 @@ void testRequirementDecreaseTriggersScaleDown() throws
Exception {
awaitJobReachingParallelism(taskManagerGateway, scheduler,
scaledDownParallelism);
}
+ @Test
+ void
testRequirementLowerBoundIncreaseBelowCurrentParallelismDoesNotTriggerRescale()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final AdaptiveScheduler scheduler =
+ createSchedulerWithNoResourceWaitTimeout(jobGraph,
declarativeSlotPool);
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ createSubmissionBufferingTaskManagerGateway(PARALLELISM,
scheduler);
+
+ startJobWithSlotsMatchingParallelism(
+ scheduler, declarativeSlotPool, taskManagerGateway,
PARALLELISM);
+ awaitJobReachingParallelism(taskManagerGateway, scheduler,
PARALLELISM);
+
+ final JobResourceRequirements newJobResourceRequirements =
+
createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM);
+
+ final CompletableFuture<Void> asyncAssertion =
+ CompletableFuture.runAsync(
+ () -> {
+ State state = scheduler.getState();
+
scheduler.updateJobResourceRequirements(newJobResourceRequirements);
+
+ // scheduler shouldn't change states
+ assertThat(scheduler.getState()).isSameAs(state);
+ // no new tasks should have been scheduled
+
assertThat(taskManagerGateway.submittedTasks).isEmpty();
+ },
+ singleThreadMainThreadExecutor);
+
+ FlinkAssertions.assertThatFuture(asyncAssertion).eventuallySucceeds();
+ }
+
+ @Test
+ void
testRequirementLowerBoundIncreaseBeyondCurrentParallelismKeepsJobRunning()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final AdaptiveScheduler scheduler =
+ createSchedulerWithNoResourceWaitTimeout(jobGraph,
declarativeSlotPool);
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ createSubmissionBufferingTaskManagerGateway(PARALLELISM,
scheduler);
+
+ startJobWithSlotsMatchingParallelism(
+ scheduler, declarativeSlotPool, taskManagerGateway,
PARALLELISM);
+ awaitJobReachingParallelism(taskManagerGateway, scheduler,
PARALLELISM);
+
+ int scaledUpParallelism = PARALLELISM * 10;
+ JobResourceRequirements newJobResourceRequirements =
+
createRequirementsWithEqualLowerAndUpperParallelism(scaledUpParallelism);
+
+ FlinkAssertions.assertThatFuture(
+ CompletableFuture.runAsync(
+ () -> {
+ final State originalState =
scheduler.getState();
+ scheduler.updateJobResourceRequirements(
+ newJobResourceRequirements);
+
assertThat(scheduler.getState()).isSameAs(originalState);
+ },
+ singleThreadMainThreadExecutor))
Review Comment:
I think we may be missing the test case that this actually scales up once
the resources are available for the new higher min parallelism. Maybe that
could be added here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]