XComp commented on code in PR #26088:
URL: https://github.com/apache/flink/pull/26088#discussion_r1933392347


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java:
##########
@@ -128,17 +129,17 @@ interface ToRestarting extends StateTransitions {
          *     Restarting} state
          * @param backoffTime backoffTime to wait before transitioning to the 
{@link Restarting}
          *     state
-         * @param forcedRestart if the {@link WaitingForResources} state 
should be omitted and the
-         *     {@link CreatingExecutionGraph} state should be entered directly 
from the {@link
-         *     Restarting} state
+         * @param restartWithParallelism if the {@link WaitingForResources} 
state should be omitted
+         *     and the {@link CreatingExecutionGraph} state should be entered 
directly from the
+         *     {@link Restarting} state

Review Comment:
   ```suggestion
            * @param restartWithParallelism the {@link VertexParallelism} that 
triggered the restarting. The {@code AdaptiveScheduler} should transition 
directly to {@link CreatingExecutionGraph} if the available parallelism hasn't 
changed while cancelling the job. If {@code null} is passed or the parallelism 
changed, {@link WaitingForResources} state should be the subsequent state.
   ```
   nit (not spotless-conform): Maybe, we can make the contract a bit more 
explicit here. 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java:
##########
@@ -112,13 +114,27 @@ void onGloballyTerminalState(JobStatus 
globallyTerminalState) {
     }
 
     private void goToSubsequentState() {
-        if (forcedRestart) {
+        if (restartWithParallelism != null && 
isForcedRestart(restartWithParallelism)) {
             context.goToCreatingExecutionGraph(getExecutionGraph());
         } else {
             context.goToWaitingForResources(getExecutionGraph());
         }
     }
 
+    private boolean isForcedRestart(VertexParallelism restartWithParallelism) {

Review Comment:
   ```suggestion
       private boolean isForcedRestart() {
           if (this.restartWithParallelism == null) {
               return false;
           }
   ```
   nit: we don't need to pass the field as a parameter here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java:
##########
@@ -112,13 +114,27 @@ void onGloballyTerminalState(JobStatus 
globallyTerminalState) {
     }
 
     private void goToSubsequentState() {
-        if (forcedRestart) {
+        if (restartWithParallelism != null && 
isForcedRestart(restartWithParallelism)) {
             context.goToCreatingExecutionGraph(getExecutionGraph());
         } else {
             context.goToWaitingForResources(getExecutionGraph());
         }
     }
 
+    private boolean isForcedRestart(VertexParallelism restartWithParallelism) {

Review Comment:
   ```suggestion
       private boolean availableParallelismNotChanged(VertexParallelism 
restartWithParallelism) {
   ```
   nit: making the method name more descriptive



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -618,7 +618,8 @@ public void 
testOmitsWaitingForResourcesStateWhenRestarting() throws Exception {
             final Executing testInstance = new 
ExecutingStateBuilder().build(ctx);
             ctx.setExpectRestarting(
                     restartingArguments ->
-                            
assertThat(restartingArguments.isForcedRestart()).isTrue());
+                            
assertThat(restartingArguments.getRestartWithParallelism())
+                                    .isNotEmpty());

Review Comment:
   ```java
               final VertexParallelism vertexParallelism = new 
VertexParallelism(Collections.singletonMap(
                       new JobVertexID(),
                       2));
               ctx.setVertexParallelism(vertexParallelism);
               ctx.setExpectRestarting(
                       restartingArguments ->
                               
assertThat(restartingArguments.getRestartWithParallelism()).hasValue(
                                       vertexParallelism));
   ```
   We could make this test more explicit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java:
##########
@@ -44,7 +46,7 @@ class Restarting extends StateWithExecutionGraph {
 
     @Nullable private ScheduledFuture<?> goToSubsequentStateFuture;
 
-    private final boolean forcedRestart;
+    private final @Nullable VertexParallelism restartWithParallelism;

Review Comment:
   I'm wondering whether we would reduce the code a bit if we would also just 
pass in the `Optional<VertexParallelism>` that's returned from 
`AdaptiveScheduler#getAvailableVertexParallelism` rather than transforming it 
into a `@Nullable` value. But it looks like we're not gaining much here. 
¯\_(ツ)_/¯



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to