This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ca968d305a99b63162136589e1d9f6ba4c9cdd2b Author: Till Rohrmann <[email protected]> AuthorDate: Sun Mar 14 13:42:49 2021 +0100 [hotfix] Let Restarting and WaitingForResources cancel scheduled tasks onLeave By canceling incompleted scheduled tasks when leaving, the states Restarting and WaitingForResources leave a cleaner state behind. --- .../flink/runtime/scheduler/adaptive/Restarting.java | 16 +++++++++++++++- .../runtime/scheduler/adaptive/WaitingForResources.java | 12 ++++++++++-- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 3 ++- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index 307dfa02..ffd7eb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -27,6 +27,8 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.concurrent.ScheduledFuture; @@ -37,6 +39,8 @@ class Restarting extends StateWithExecutionGraph { private final Duration backoffTime; + @Nullable private ScheduledFuture<?> goToWaitingForResourcesFuture; + Restarting( Context context, ExecutionGraph executionGraph, @@ -52,6 +56,15 @@ class Restarting extends StateWithExecutionGraph { } @Override + public void onLeave(Class<? extends State> newState) { + if (goToWaitingForResourcesFuture != null) { + goToWaitingForResourcesFuture.cancel(false); + } + + super.onLeave(newState); + } + + @Override public JobStatus getJobStatus() { return JobStatus.RESTARTING; } @@ -75,7 +88,8 @@ class Restarting extends StateWithExecutionGraph { @Override void onGloballyTerminalState(JobStatus globallyTerminalState) { Preconditions.checkArgument(globallyTerminalState == JobStatus.CANCELED); - context.runIfState(this, context::goToWaitingForResources, backoffTime); + goToWaitingForResourcesFuture = + context.runIfState(this, context::goToWaitingForResources, backoffTime); } /** Context of the {@link Restarting} state. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java index 9489ddd..3b2daff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java @@ -41,6 +41,8 @@ class WaitingForResources implements State, ResourceConsumer { private final ResourceCounter desiredResources; + private final ScheduledFuture<?> resourceTimeoutFuture; + WaitingForResources( Context context, Logger log, @@ -53,12 +55,18 @@ class WaitingForResources implements State, ResourceConsumer { !desiredResources.isEmpty(), "Desired resources must not be empty"); // since state transitions are not allowed in state constructors, schedule calls for later. - context.runIfState( - this, this::resourceTimeout, Preconditions.checkNotNull(resourceTimeout)); + resourceTimeoutFuture = + context.runIfState( + this, this::resourceTimeout, Preconditions.checkNotNull(resourceTimeout)); context.runIfState(this, this::notifyNewResourcesAvailable, Duration.ZERO); } @Override + public void onLeave(Class<? extends State> newState) { + resourceTimeoutFuture.cancel(false); + } + + @Override public void cancel() { context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 0489e77..8a433a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -76,6 +76,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -769,7 +770,7 @@ public class AdaptiveSchedulerTest extends TestLogger { archivedExecutionGraph.getFailureInfo().getException(), FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint")); } finally { - singleThreadExecutor.shutdownNow(); + ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, singleThreadExecutor); } }
