zentol commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1150221233


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -951,6 +949,138 @@ void testConsistentMaxParallelism() throws Exception {
         
assertThat(resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
     }
 
+    @Test
+    void testRequirementIncreaseTriggersScaleUp() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final int scaledUpParallelism = PARALLELISM * 2;
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                
createSubmissionBufferingTaskManagerGateway(scaledUpParallelism, scheduler);
+
+        startJobWithSlotsMatchingParallelism(
+                scheduler, declarativeSlotPool, taskManagerGateway, 
PARALLELISM);
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
PARALLELISM);
+
+        JobResourceRequirements newJobResourceRequirements =
+                createRequirementsWithUpperParallelism(scaledUpParallelism);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    // first update requirements as otherwise slots are 
rejected!
+                    
scheduler.updateJobResourceRequirements(newJobResourceRequirements);
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                            taskManagerGateway);
+                });
+
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
scaledUpParallelism);
+    }
+
+    @Test
+    void testRequirementDecreaseTriggersScaleDown() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                createSubmissionBufferingTaskManagerGateway(PARALLELISM, 
scheduler);
+
+        startJobWithSlotsMatchingParallelism(
+                scheduler, declarativeSlotPool, taskManagerGateway, 
PARALLELISM);
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
PARALLELISM);
+
+        int scaledDownParallelism = PARALLELISM / 2;

Review Comment:
   Are you referring to the minimum parallelism increase stuff? That should 
already be covered by the controller unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to