This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3dd938d49ef7b7f0f68853ce0261aa4c43762bc5 Author: Till Rohrmann <[email protected]> AuthorDate: Wed Mar 17 12:26:38 2021 +0100 [FLINK-21602] Track asynchronous background tasks in the AdaptiveScheduler The AdaptiveScheduler triggers asynchronous background tasks such as the creation of the ExecutionGraph. In order to know when we can stop the checkpointing services we have to keep track of these asynchronous tasks and only shut down the services after the tasks are completed. This ensures that there are no concurrent accesses to the checkpoint services. This closes #15251. --- .../scheduler/adaptive/AdaptiveScheduler.java | 86 ++++++------ .../runtime/scheduler/adaptive/BackgroundTask.java | 130 +++++++++++++++++ .../scheduler/adaptive/AdaptiveSchedulerTest.java | 15 +- .../scheduler/adaptive/BackgroundTaskTest.java | 156 +++++++++++++++++++++ 4 files changed, 339 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 4615175..162c29b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -111,7 +111,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -186,6 +185,8 @@ public class AdaptiveScheduler private final MutableVertexAttemptNumberStore vertexAttemptNumberStore = new DefaultVertexAttemptNumberStore(); + private BackgroundTask<ExecutionGraph> backgroundTask = BackgroundTask.finishedBackgroundTask(); + public AdaptiveScheduler( JobGraph jobGraph, Configuration configuration, @@ -283,9 +284,42 @@ public class AdaptiveScheduler @Override public CompletableFuture<Void> closeAsync() { + LOG.debug("Closing the AdaptiveScheduler. Trying to suspend the current job execution."); + state.suspend(new FlinkException("AdaptiveScheduler is being stopped.")); - return FutureUtils.completedVoidFuture(); + Preconditions.checkState( + state instanceof Finished, + "Scheduler state should be finished after calling state.suspend."); + + backgroundTask.abort(); + // wait for the background task to finish and then close services + return FutureUtils.runAfterwardsAsync( + backgroundTask.getTerminationFuture(), + () -> stopCheckpointServicesSafely(jobTerminationFuture.get()), + getMainThreadExecutor()); + } + + private void stopCheckpointServicesSafely(JobStatus terminalState) { + LOG.debug("Stopping the checkpoint services with state {}.", terminalState); + + Exception exception = null; + + try { + completedCheckpointStore.shutdown(terminalState, checkpointsCleaner); + } catch (Exception e) { + exception = e; + } + + try { + checkpointIdCounter.shutdown(terminalState); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + LOG.warn("Failed to stop checkpoint services.", exception); + } } @Override @@ -746,24 +780,14 @@ public class AdaptiveScheduler private CompletableFuture<ExecutionGraph> createExecutionGraphAndRestoreStateAsync( JobGraph adjustedJobGraph) { - return CompletableFuture.supplyAsync( - () -> { - try { - return createExecutionGraphAndRestoreState(adjustedJobGraph); - } catch (Exception exception) { - throw new CompletionException(exception); - } - }, - ioExecutor) - .handleAsync( - (executionGraph, throwable) -> { - if (throwable != null) { - throw new CompletionException(throwable); - } else { - return executionGraph; - } - }, - getMainThreadExecutor()); + backgroundTask.abort(); + + backgroundTask = + backgroundTask.runAfter( + () -> createExecutionGraphAndRestoreState(adjustedJobGraph), ioExecutor); + + return FutureUtils.switchExecutor( + backgroundTask.getResultFuture(), getMainThreadExecutor()); } @Nonnull @@ -819,8 +843,6 @@ public class AdaptiveScheduler @Override public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) { - stopCheckpointServicesSafely(archivedExecutionGraph.getState()); - if (jobStatusListener != null) { jobStatusListener.jobStatusChanges( jobInformation.getJobID(), @@ -834,26 +856,6 @@ public class AdaptiveScheduler jobTerminationFuture.complete(archivedExecutionGraph.getState()); } - private void stopCheckpointServicesSafely(JobStatus terminalState) { - Exception exception = null; - - try { - completedCheckpointStore.shutdown(terminalState, checkpointsCleaner); - } catch (Exception e) { - exception = e; - } - - try { - checkpointIdCounter.shutdown(terminalState); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Failed to stop checkpoint services.", exception); - } - } - @Override public Executing.FailureResult howToHandleFailure(Throwable failure) { if (ExecutionFailureHandler.isUnrecoverableError(failure)) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java new file mode 100644 index 0000000..7452c07 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.SupplierWithException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * BackgroundTask encapsulates an asynchronous operation which can produce a result. The result can + * be accessed via {@link BackgroundTask#getResultFuture()}. Additionally, the task allows to track + * its completion via {@link BackgroundTask#getTerminationFuture()}. + * + * <p>In order to ensure the order of background tasks, one can use the {@link + * BackgroundTask#runAfter} to schedule tasks which are executed after this task has completed. + * Tasks which are executed sequentially like this won't be affected by the outcome of previous + * tasks. This means that a failed task won't stop succeeding tasks from being executed. + * + * @param <T> type of the produced result + */ +final class BackgroundTask<T> { + private final CompletableFuture<Void> terminationFuture; + + private final CompletableFuture<T> resultFuture; + + private volatile boolean isAborted = false; + + private BackgroundTask( + CompletableFuture<Void> previousTerminationFuture, + SupplierWithException<? extends T, ? extends Exception> task, + Executor executor) { + resultFuture = + previousTerminationFuture.thenApplyAsync( + ignored -> { + if (!isAborted) { + try { + return task.get(); + } catch (Exception exception) { + throw new CompletionException(exception); + } + } else { + throw new CompletionException( + new FlinkException("Background task has been aborted.")); + } + }, + executor); + + terminationFuture = resultFuture.handle((ignored, ignoredThrowable) -> null); + } + + private BackgroundTask() { + terminationFuture = FutureUtils.completedVoidFuture(); + resultFuture = + FutureUtils.completedExceptionally( + new FlinkException( + "No result has been created because it is a finished background task.")); + } + + /** + * Abort the execution of this background task. This method has only an effect if the background + * task has not been started yet. + */ + void abort() { + isAborted = true; + } + + public CompletableFuture<T> getResultFuture() { + return resultFuture; + } + + CompletableFuture<Void> getTerminationFuture() { + return terminationFuture; + } + + /** + * Runs the given task after this background task has completed (normally or exceptionally). + * + * @param task task to run after this background task has completed + * @param executor executor to run the task + * @param <V> type of the result + * @return new {@link BackgroundTask} representing the new task to execute + */ + <V> BackgroundTask<V> runAfter( + SupplierWithException<? extends V, ? extends Exception> task, Executor executor) { + return new BackgroundTask<>(terminationFuture, task, executor); + } + + /** + * Creates a finished background task which can be used as the start of a background task chain. + * + * @param <V> type of the background task + * @return A finished background task + */ + static <V> BackgroundTask<V> finishedBackgroundTask() { + return new BackgroundTask<>(); + } + + /** + * Creates an initial background task. This means that this background task has no predecessor. + * + * @param task task to run + * @param executor executor to run the task + * @param <V> type of the result + * @return initial {@link BackgroundTask} representing the task to execute + */ + static <V> BackgroundTask<V> initialBackgroundTask( + SupplierWithException<? extends V, ? extends Exception> task, Executor executor) { + return new BackgroundTask<>(FutureUtils.completedVoidFuture(), task, executor); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 9bbf7a5..af7f00d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -568,7 +568,7 @@ public class AdaptiveSchedulerTest extends TestLogger { } @Test - public void testGoToFinishedShutsDownCheckpointingComponents() throws Exception { + public void testCloseShutsDownCheckpointingComponents() throws Exception { final CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture = new CompletableFuture<>(); final CompletedCheckpointStore completedCheckpointStore = @@ -586,16 +586,19 @@ public class AdaptiveSchedulerTest extends TestLogger { CheckpointCoordinatorConfiguration.builder().build(), null)); final AdaptiveScheduler scheduler = - new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor) + new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) .setCheckpointRecoveryFactory( new TestingCheckpointRecoveryFactory( completedCheckpointStore, checkpointIdCounter)) .build(); - final ArchivedExecutionGraph archivedExecutionGraph = - new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build(); - - scheduler.goToFinished(archivedExecutionGraph); + singleThreadMainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + // transition into the FAILED state + scheduler.handleGlobalFailure(new FlinkException("Test exception")); + scheduler.closeAsync(); + }); assertThat(completedCheckpointStoreShutdownFuture.get(), is(JobStatus.FAILED)); assertThat(checkpointIdCounterShutdownFuture.get(), is(JobStatus.FAILED)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java new file mode 100644 index 0000000..d224ff6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for the {@link BackgroundTask}. */ +public class BackgroundTaskTest extends TestLogger { + + @ClassRule + public static final TestExecutorResource<ExecutorService> TEST_EXECUTOR_RESOURCE = + new TestExecutorResource<>(() -> Executors.newFixedThreadPool(2)); + + @Test + public void testFinishedBackgroundTaskIsTerminated() { + final BackgroundTask<Void> finishedBackgroundTask = BackgroundTask.finishedBackgroundTask(); + + assertTrue(finishedBackgroundTask.getTerminationFuture().isDone()); + finishedBackgroundTask.getTerminationFuture().join(); + } + + @Test + public void testFinishedBackgroundTaskDoesNotContainAResult() { + final BackgroundTask<Void> finishedBackgroundTask = BackgroundTask.finishedBackgroundTask(); + + assertTrue(finishedBackgroundTask.getResultFuture().isCompletedExceptionally()); + } + + @Test + public void testNormalCompletionOfBackgroundTask() { + final String expectedValue = "foobar"; + final BackgroundTask<String> backgroundTask = + BackgroundTask.finishedBackgroundTask() + .runAfter(() -> expectedValue, TEST_EXECUTOR_RESOURCE.getExecutor()); + + assertThat(backgroundTask.getResultFuture().join(), Matchers.is(expectedValue)); + // check that the termination future has completed normally + backgroundTask.getTerminationFuture().join(); + } + + @Test + public void testExceptionalCompletionOfBackgroundTask() throws InterruptedException { + final Exception expectedException = new Exception("Test exception"); + final BackgroundTask<String> backgroundTask = + BackgroundTask.finishedBackgroundTask() + .runAfter( + () -> { + throw expectedException; + }, + TEST_EXECUTOR_RESOURCE.getExecutor()); + + try { + backgroundTask.getResultFuture().get(); + fail("Expected an exceptionally completed result future."); + } catch (ExecutionException ee) { + assertThat(ee, FlinkMatchers.containsCause(expectedException)); + } + // check that the termination future has completed normally + backgroundTask.getTerminationFuture().join(); + } + + @Test + public void testRunAfterExecutesBackgroundTaskAfterPreviousHasCompleted() { + final OneShotLatch blockingLatch = new OneShotLatch(); + final BlockingQueue<Integer> taskCompletions = new ArrayBlockingQueue<>(2); + final BackgroundTask<Void> backgroundTask = + BackgroundTask.initialBackgroundTask( + () -> { + blockingLatch.await(); + taskCompletions.offer(1); + return null; + }, + TEST_EXECUTOR_RESOURCE.getExecutor()) + .runAfter( + () -> { + taskCompletions.offer(2); + return null; + }, + TEST_EXECUTOR_RESOURCE.getExecutor()); + + blockingLatch.trigger(); + + backgroundTask.getTerminationFuture().join(); + + assertThat(taskCompletions, Matchers.contains(1, 2)); + } + + @Test + public void testAbortSkipsTasksWhichHaveNotBeenStarted() { + final OneShotLatch blockingLatch = new OneShotLatch(); + final BlockingQueue<Integer> taskCompletions = new ArrayBlockingQueue<>(2); + final BackgroundTask<Void> backgroundTask = + BackgroundTask.initialBackgroundTask( + () -> { + blockingLatch.await(); + taskCompletions.offer(1); + return null; + }, + TEST_EXECUTOR_RESOURCE.getExecutor()) + .runAfter( + () -> { + taskCompletions.offer(2); + return null; + }, + TEST_EXECUTOR_RESOURCE.getExecutor()); + + final BackgroundTask<Void> finalTask = + backgroundTask.runAfter( + () -> { + taskCompletions.offer(3); + return null; + }, + TEST_EXECUTOR_RESOURCE.getExecutor()); + + backgroundTask.abort(); + + blockingLatch.trigger(); + + finalTask.getTerminationFuture().join(); + + assertThat(taskCompletions, Matchers.contains(1, 3)); + } +}
