echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1362218036
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -252,28 +259,94 @@ public void testTransitionToFinishedOnSuspend() throws
Exception {
}
@Test
- public void
testNotifyNewResourcesAvailableWithCanScaleUpTransitionsToRestarting()
+ public void
testNotifyNewResourcesAvailableBeforeCooldownIsOverScheduledStateChange()
+ throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ // do not wait too long in the test
+ final Duration scalingIntervalMin = Duration.ofSeconds(1L);
+ final ExecutingStateBuilder executingStateBuilder =
+ new
ExecutingStateBuilder().setScalingIntervalMin(scalingIntervalMin);
+ Executing exec = executingStateBuilder.build(ctx);
+ ctx.setCanScaleUp(true); // => rescale
+ ctx.setExpectRestarting( // scheduled rescale should restart the
job after cooldown
+ restartingArguments -> {
+ assertThat(restartingArguments.getBackoffTime(),
is(Duration.ZERO));
+ assertThat(ctx.actionWasScheduled, is(true));
+ });
+ exec.onNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void
testNotifyNewResourcesAvailableAfterCooldownIsOverStateChange() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final ExecutingStateBuilder executingStateBuilder =
+ new ExecutingStateBuilder()
+ .setScalingIntervalMin(Duration.ofSeconds(20L))
+
.setLastRescale(Instant.now().minus(Duration.ofSeconds(30L)));
+ Executing exec = executingStateBuilder.build(ctx);
+ ctx.setCanScaleUp(true); // => rescale
+ ctx.setExpectRestarting(
+ restartingArguments -> { // immediate rescale
+ assertThat(restartingArguments.getBackoffTime(),
is(Duration.ZERO));
+ assertThat(ctx.actionWasScheduled, is(false));
+ });
+ exec.onNewResourcesAvailable();
+ }
+ }
+
+ @Test
+ public void
testNotifyNewResourcesAvailableWithCanScaleUpWithoutForceTransitionsToRestarting()
throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
Executing exec = new ExecutingStateBuilder().build(ctx);
ctx.setExpectRestarting(
restartingArguments -> {
- // expect immediate restart on scale up
+ // immediate rescale
assertThat(restartingArguments.getBackoffTime(),
is(Duration.ZERO));
+ assertThat(ctx.actionWasScheduled, is(false));
});
- ctx.setCanScaleUp(() -> true);
+ ctx.setCanScaleUp(true); // => rescale
exec.onNewResourcesAvailable();
}
}
@Test
- public void
testNotifyNewResourcesAvailableWithNoResourcesAndNoStateChange() throws
Exception {
+ public void
testNotifyNewResourcesAvailableWithCantScaleUpWithoutForceAndCantScaleUpWithForce()
+ throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
- Executing exec = new ExecutingStateBuilder().build(ctx);
- ctx.setCanScaleUp(() -> false);
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setScalingIntervalMax(Duration.ofSeconds(1L))
+ .build(ctx);
+ ctx.setCanScaleUp(
+ false, false); // => schedule force rescale but resource
lost on timeout =>
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]