This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca968d305a99b63162136589e1d9f6ba4c9cdd2b
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Mar 14 13:42:49 2021 +0100

    [hotfix] Let Restarting and WaitingForResources cancel scheduled tasks 
onLeave
    
    By canceling incompleted scheduled tasks when leaving, the states 
Restarting and WaitingForResources
    leave a cleaner state behind.
---
 .../flink/runtime/scheduler/adaptive/Restarting.java     | 16 +++++++++++++++-
 .../runtime/scheduler/adaptive/WaitingForResources.java  | 12 ++++++++++--
 .../scheduler/adaptive/AdaptiveSchedulerTest.java        |  3 ++-
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index 307dfa02..ffd7eb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -27,6 +27,8 @@ import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.concurrent.ScheduledFuture;
 
@@ -37,6 +39,8 @@ class Restarting extends StateWithExecutionGraph {
 
     private final Duration backoffTime;
 
+    @Nullable private ScheduledFuture<?> goToWaitingForResourcesFuture;
+
     Restarting(
             Context context,
             ExecutionGraph executionGraph,
@@ -52,6 +56,15 @@ class Restarting extends StateWithExecutionGraph {
     }
 
     @Override
+    public void onLeave(Class<? extends State> newState) {
+        if (goToWaitingForResourcesFuture != null) {
+            goToWaitingForResourcesFuture.cancel(false);
+        }
+
+        super.onLeave(newState);
+    }
+
+    @Override
     public JobStatus getJobStatus() {
         return JobStatus.RESTARTING;
     }
@@ -75,7 +88,8 @@ class Restarting extends StateWithExecutionGraph {
     @Override
     void onGloballyTerminalState(JobStatus globallyTerminalState) {
         Preconditions.checkArgument(globallyTerminalState == 
JobStatus.CANCELED);
-        context.runIfState(this, context::goToWaitingForResources, 
backoffTime);
+        goToWaitingForResourcesFuture =
+                context.runIfState(this, context::goToWaitingForResources, 
backoffTime);
     }
 
     /** Context of the {@link Restarting} state. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index 9489ddd..3b2daff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -41,6 +41,8 @@ class WaitingForResources implements State, ResourceConsumer {
 
     private final ResourceCounter desiredResources;
 
+    private final ScheduledFuture<?> resourceTimeoutFuture;
+
     WaitingForResources(
             Context context,
             Logger log,
@@ -53,12 +55,18 @@ class WaitingForResources implements State, 
ResourceConsumer {
                 !desiredResources.isEmpty(), "Desired resources must not be 
empty");
 
         // since state transitions are not allowed in state constructors, 
schedule calls for later.
-        context.runIfState(
-                this, this::resourceTimeout, 
Preconditions.checkNotNull(resourceTimeout));
+        resourceTimeoutFuture =
+                context.runIfState(
+                        this, this::resourceTimeout, 
Preconditions.checkNotNull(resourceTimeout));
         context.runIfState(this, this::notifyNewResourcesAvailable, 
Duration.ZERO);
     }
 
     @Override
+    public void onLeave(Class<? extends State> newState) {
+        resourceTimeoutFuture.cancel(false);
+    }
+
+    @Override
     public void cancel() {
         
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, 
null));
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 0489e77..8a433a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -76,6 +76,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -769,7 +770,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
                     archivedExecutionGraph.getFailureInfo().getException(),
                     FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
         } finally {
-            singleThreadExecutor.shutdownNow();
+            ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, 
singleThreadExecutor);
         }
     }
 

Reply via email to