XComp commented on code in PR #26088:
URL: https://github.com/apache/flink/pull/26088#discussion_r1933392347
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java:
##########
@@ -128,17 +129,17 @@ interface ToRestarting extends StateTransitions {
* Restarting} state
* @param backoffTime backoffTime to wait before transitioning to the
{@link Restarting}
* state
- * @param forcedRestart if the {@link WaitingForResources} state
should be omitted and the
- * {@link CreatingExecutionGraph} state should be entered directly
from the {@link
- * Restarting} state
+ * @param restartWithParallelism if the {@link WaitingForResources}
state should be omitted
+ * and the {@link CreatingExecutionGraph} state should be entered
directly from the
+ * {@link Restarting} state
Review Comment:
```suggestion
* @param restartWithParallelism the {@link VertexParallelism} that
triggered the restarting. The {@code AdaptiveScheduler} should transition
directly to {@link CreatingExecutionGraph} if the available parallelism hasn't
changed while cancelling the job. If {@code null} is passed or the parallelism
changed, {@link WaitingForResources} state should be the subsequent state.
```
nit (not spotless-conform): Maybe, we can make the contract a bit more
explicit here. 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java:
##########
@@ -112,13 +114,27 @@ void onGloballyTerminalState(JobStatus
globallyTerminalState) {
}
private void goToSubsequentState() {
- if (forcedRestart) {
+ if (restartWithParallelism != null &&
isForcedRestart(restartWithParallelism)) {
context.goToCreatingExecutionGraph(getExecutionGraph());
} else {
context.goToWaitingForResources(getExecutionGraph());
}
}
+ private boolean isForcedRestart(VertexParallelism restartWithParallelism) {
Review Comment:
```suggestion
private boolean isForcedRestart() {
if (this.restartWithParallelism == null) {
return false;
}
```
nit: we don't need to pass the field as a parameter here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java:
##########
@@ -112,13 +114,27 @@ void onGloballyTerminalState(JobStatus
globallyTerminalState) {
}
private void goToSubsequentState() {
- if (forcedRestart) {
+ if (restartWithParallelism != null &&
isForcedRestart(restartWithParallelism)) {
context.goToCreatingExecutionGraph(getExecutionGraph());
} else {
context.goToWaitingForResources(getExecutionGraph());
}
}
+ private boolean isForcedRestart(VertexParallelism restartWithParallelism) {
Review Comment:
```suggestion
private boolean availableParallelismNotChanged(VertexParallelism
restartWithParallelism) {
```
nit: making the method name more descriptive
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -618,7 +618,8 @@ public void
testOmitsWaitingForResourcesStateWhenRestarting() throws Exception {
final Executing testInstance = new
ExecutingStateBuilder().build(ctx);
ctx.setExpectRestarting(
restartingArguments ->
-
assertThat(restartingArguments.isForcedRestart()).isTrue());
+
assertThat(restartingArguments.getRestartWithParallelism())
+ .isNotEmpty());
Review Comment:
```java
final VertexParallelism vertexParallelism = new
VertexParallelism(Collections.singletonMap(
new JobVertexID(),
2));
ctx.setVertexParallelism(vertexParallelism);
ctx.setExpectRestarting(
restartingArguments ->
assertThat(restartingArguments.getRestartWithParallelism()).hasValue(
vertexParallelism));
```
We could make this test more explicit.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java:
##########
@@ -44,7 +46,7 @@ class Restarting extends StateWithExecutionGraph {
@Nullable private ScheduledFuture<?> goToSubsequentStateFuture;
- private final boolean forcedRestart;
+ private final @Nullable VertexParallelism restartWithParallelism;
Review Comment:
I'm wondering whether we would reduce the code a bit if we would also just
pass in the `Optional<VertexParallelism>` that's returned from
`AdaptiveScheduler#getAvailableVertexParallelism` rather than transforming it
into a `@Nullable` value. But it looks like we're not gaining much here.
¯\_(ツ)_/¯
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]