1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1361760710


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1162,16 +1167,24 @@ private ExecutionGraph 
createExecutionGraphAndRestoreState(
                 LOG);
     }
 
+    /**
+     * In regular mode, rescale the job if added resource meets {@link
+     * JobManagerOptions#MIN_PARALLELISM_INCREASE}. In force mode rescale if 
the parallelism has
+     * changed.
+     */
     @Override
-    public boolean shouldRescale(ExecutionGraph executionGraph) {
+    public boolean shouldRescale(ExecutionGraph executionGraph, boolean 
forceRescale) {

Review Comment:
   Would you mind adding a test for this method to check whether the return 
value is expected when the `forceRescale` is true and false?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws 
Exception {
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+    public void 
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+            throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            // do not wait too long in the test
+            final Duration scalingIntervalMin = Duration.ofSeconds(1L);
+            final ExecutingStateBuilder executingStateBuilder =
+                    new 
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting( // scheduled rescale should restart the 
job after cooldown
+                    restartingArguments -> {
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));

Review Comment:
   This test is a little wired, because we expect the rescale should be 
scheduled in `scalingIntervalMin`, and the test is finished quickly, so the 
restarting shouldn't be created.
   
   However, the test checks the `ExpectRestarting` here.
   
   The root cause is `MockExecutingContext#runIfState` didn't address the 
`delay`, I think we should improve it.
   
   And this test should check :
   - The Executing isn't created immediately
   - The Executing is created after `scalingIntervalMin`
   
   
   WDYT?
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws 
Exception {
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+    public void 
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+            throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            // do not wait too long in the test
+            final Duration scalingIntervalMin = Duration.ofSeconds(1L);
+            final ExecutingStateBuilder executingStateBuilder =
+                    new 
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting( // scheduled rescale should restart the 
job after cooldown
+                    restartingArguments -> {
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(true));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final ExecutingStateBuilder executingStateBuilder =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMin(Duration.ofSeconds(20L))
+                            
.setLastRescale(Instant.now().minus(Duration.ofSeconds(30L)));
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting(
+                    restartingArguments -> { // immediate rescale
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableWithCanScaleUpWithoutForceTransitionsToRestarting()
             throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             Executing exec = new ExecutingStateBuilder().build(ctx);
 
             ctx.setExpectRestarting(
                     restartingArguments -> {
-                        // expect immediate restart on scale up
+                        // immediate rescale
                         assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
                     });
-            ctx.setCanScaleUp(() -> true);
+            ctx.setCanScaleUp(true); // => rescale
             exec.onNewResourcesAvailable();
         }
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithNoResourcesAndNoStateChange() throws 
Exception {
+    public void 
testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCantScaleUpWithForce()
+            throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
-            Executing exec = new ExecutingStateBuilder().build(ctx);
-            ctx.setCanScaleUp(() -> false);
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMax(Duration.ofSeconds(1L))
+                            .build(ctx);
+            ctx.setCanScaleUp(
+                    false, false); // => schedule force rescale but resource 
lost on timeout =>

Review Comment:
   nit: it's better to move all comments to the last line of corresponding prod 
code.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws 
Exception {
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+    public void 
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+            throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            // do not wait too long in the test
+            final Duration scalingIntervalMin = Duration.ofSeconds(1L);
+            final ExecutingStateBuilder executingStateBuilder =
+                    new 
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting( // scheduled rescale should restart the 
job after cooldown
+                    restartingArguments -> {
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(true));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final ExecutingStateBuilder executingStateBuilder =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMin(Duration.ofSeconds(20L))
+                            
.setLastRescale(Instant.now().minus(Duration.ofSeconds(30L)));
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting(
+                    restartingArguments -> { // immediate rescale
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableWithCanScaleUpWithoutForceTransitionsToRestarting()
             throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             Executing exec = new ExecutingStateBuilder().build(ctx);
 
             ctx.setExpectRestarting(
                     restartingArguments -> {
-                        // expect immediate restart on scale up
+                        // immediate rescale
                         assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
                     });
-            ctx.setCanScaleUp(() -> true);
+            ctx.setCanScaleUp(true); // => rescale
             exec.onNewResourcesAvailable();
         }
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithNoResourcesAndNoStateChange() throws 
Exception {
+    public void 
testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCantScaleUpWithForce()
+            throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
-            Executing exec = new ExecutingStateBuilder().build(ctx);
-            ctx.setCanScaleUp(() -> false);
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMax(Duration.ofSeconds(1L))
+                            .build(ctx);
+            ctx.setCanScaleUp(
+                    false, false); // => schedule force rescale but resource 
lost on timeout =>
+            // no rescale
             exec.onNewResourcesAvailable();
             ctx.assertNoStateTransition();
+            assertThat(ctx.actionWasScheduled, is(true));
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCanScaleUpWithForce()
+            throws Exception {

Review Comment:
   - if minimum scaling requirements are not met
     - case1: if last rescale was done more than scaling-interval.max ago, a 
rescale is forced.
     - case2: otherwise, schedule a forced rescale in scaling-interval.max
   
   This test is case1, and the case2 isn't covered, right?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws 
Exception {
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+    public void 
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+            throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            // do not wait too long in the test
+            final Duration scalingIntervalMin = Duration.ofSeconds(1L);
+            final ExecutingStateBuilder executingStateBuilder =
+                    new 
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting( // scheduled rescale should restart the 
job after cooldown
+                    restartingArguments -> {
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(true));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws 
Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            final ExecutingStateBuilder executingStateBuilder =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMin(Duration.ofSeconds(20L))
+                            
.setLastRescale(Instant.now().minus(Duration.ofSeconds(30L)));
+            Executing exec = executingStateBuilder.build(ctx);
+            ctx.setCanScaleUp(true); // => rescale
+            ctx.setExpectRestarting(
+                    restartingArguments -> { // immediate rescale
+                        assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
+                    });
+            exec.onNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void 
testNotifyNewResourcesAvailableWithCanScaleUpWithoutForceTransitionsToRestarting()
             throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             Executing exec = new ExecutingStateBuilder().build(ctx);
 
             ctx.setExpectRestarting(
                     restartingArguments -> {
-                        // expect immediate restart on scale up
+                        // immediate rescale
                         assertThat(restartingArguments.getBackoffTime(), 
is(Duration.ZERO));
+                        assertThat(ctx.actionWasScheduled, is(false));
                     });
-            ctx.setCanScaleUp(() -> true);
+            ctx.setCanScaleUp(true); // => rescale
             exec.onNewResourcesAvailable();
         }
     }
 
     @Test
-    public void 
testNotifyNewResourcesAvailableWithNoResourcesAndNoStateChange() throws 
Exception {
+    public void 
testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCantScaleUpWithForce()
+            throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
-            Executing exec = new ExecutingStateBuilder().build(ctx);
-            ctx.setCanScaleUp(() -> false);
+            Executing exec =
+                    new ExecutingStateBuilder()
+                            .setScalingIntervalMax(Duration.ofSeconds(1L))
+                            .build(ctx);
+            ctx.setCanScaleUp(
+                    false, false); // => schedule force rescale but resource 
lost on timeout =>
+            // no rescale
             exec.onNewResourcesAvailable();
             ctx.assertNoStateTransition();
+            assertThat(ctx.actionWasScheduled, is(true));

Review Comment:
   From this test, it may reflect a bug.
   
   `setCanScaleUp(false, false)` means we shouldn't do any rescale, however the 
`actionWasScheduled` is true. After debug, I found `context.runIfState(this, 
this::forceRescale, scalingIntervalMax);` must be called when 
`scalingIntervalMax` isn't null. Is it expected?
   
   Because `maybeRescale` will be called at the constructor of `Executing`:
   - `context.shouldRescale(getExecutionGraph(), false)` is false when the 
resource isn't changed.
   - The `scalingIntervalMax != null `
   - The timeSinceLastRescale() is 0, so 
`timeSinceLastRescale().compareTo(scalingIntervalMax) > 0` is false
   
   So the current code will must schedule a `forceRescale` after 
`scalingIntervalMax`. I didn't think the call stack is expected.
   
   WDYT?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to