1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1357685699
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -570,8 +659,9 @@ public void setHowToHandleFailure(Function<Throwable,
FailureResult> function) {
this.howToHandleFailure = function;
}
- public void setCanScaleUp(Supplier<Boolean> supplier) {
- this.canScaleUp = supplier;
+ public void setCanScaleUp(Boolean minIncreaseMet, Boolean
parallelismChangeAfterTimeout) {
+ this.minIncreaseMet = minIncreaseMet;
+ this.parallelismChangeAfterTimeout = parallelismChangeAfterTimeout;
Review Comment:
Same comment with prod comment[1], Executing side just care about whether
`forceRescale` is ture or false, and shouldn't mention minIncreaseMet or
parallelismChangeAfterTimeout.
That's why the old code is `canScaleUp` instead of `minIncreaseMet`. I
prefer rename them to `canScaleUpWithoutForce` and `canScaleUpWithForce`.
Also, these 2 options can be set by 2 setter methods.
WDYT?
[1] https://github.com/apache/flink/pull/22985#discussion_r1356382291
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +257,96 @@ public void testTransitionToFinishedOnSuspend() throws
Exception {
}
@Test
- public void
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+ public void
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+ throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final Duration scalingIntervalMin =
+ Duration.ofSeconds(1L); // do not wait too long in the test
+ final ExecutingStateBuilder executingStateBuilder =
+ new
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+ Executing exec = executingStateBuilder.build(ctx);
+ exec.setLastRescale(Instant.now());
+ ctx.setCanScaleUp(true, null); // min met => rescale
+ ctx.setExpectRestarting( // scheduled rescale should restart the
job after cooldown
+ restartingArguments -> {
+ assertThat(restartingArguments.getBackoffTime(),
is(Duration.ZERO));
+ assertThat(ctx.actionWasScheduled, is(true));
+ });
+ exec.onNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void
testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final ExecutingStateBuilder executingStateBuilder =
+ new
ExecutingStateBuilder().setScalingIntervalMin(Duration.ofSeconds(20L));
+ Executing exec = executingStateBuilder.build(ctx);
+ exec.setLastRescale(Instant.now().minus(Duration.ofSeconds(30L)));
+ ctx.setCanScaleUp(true, null); // min met => rescale
+ ctx.setExpectRestarting(
+ restartingArguments -> { // immediate rescale
+ assertThat(restartingArguments.getBackoffTime(),
is(Duration.ZERO));
+ assertThat(ctx.actionWasScheduled, is(false));
+ });
+ exec.onNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void
testNotifyNewResourcesAvailableWithMinMetTransitionsToRestarting()
Review Comment:
And I see a lot of method name and comments include `MinMet`, `MinNotMet`,
`min met` and `min not met`, etc.
Would you mind updating all of them to `withoutForce` or `withForce`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]