zentol commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1150229502
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -951,6 +949,138 @@ void testConsistentMaxParallelism() throws Exception {
assertThat(resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
}
+ @Test
+ void testRequirementIncreaseTriggersScaleUp() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph,
singleThreadMainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .setJobMasterConfiguration(configuration)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ final int scaledUpParallelism = PARALLELISM * 2;
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+
createSubmissionBufferingTaskManagerGateway(scaledUpParallelism, scheduler);
+
+ startJobWithSlotsMatchingParallelism(
+ scheduler, declarativeSlotPool, taskManagerGateway,
PARALLELISM);
+ awaitJobReachingParallelism(taskManagerGateway, scheduler,
PARALLELISM);
+
+ JobResourceRequirements newJobResourceRequirements =
+ createRequirementsWithUpperParallelism(scaledUpParallelism);
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ // first update requirements as otherwise slots are
rejected!
+
scheduler.updateJobResourceRequirements(newJobResourceRequirements);
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(
+ ResourceProfile.UNKNOWN,
PARALLELISM)),
+ taskManagerGateway);
+ });
+
+ awaitJobReachingParallelism(taskManagerGateway, scheduler,
scaledUpParallelism);
+ }
+
+ @Test
+ void testRequirementDecreaseTriggersScaleDown() throws Exception {
Review Comment:
Ill extend the code sharing slightly (scheduler construction).
I assume you're referring to restructuring these tests with something like
`testRequirementChangeTriggersRescaling(<some provider for new requirements>,
<some optional lambda to offer slots>, <some other parameter>)`.
In my experience such tests are less readable and harder to maintain in the
long-run.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]