This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3dd938d49ef7b7f0f68853ce0261aa4c43762bc5
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Mar 17 12:26:38 2021 +0100

    [FLINK-21602] Track asynchronous background tasks in the AdaptiveScheduler
    
    The AdaptiveScheduler triggers asynchronous background tasks such as the 
creation of the ExecutionGraph.
    In order to know when we can stop the checkpointing services we have to 
keep track of these asynchronous
    tasks and only shut down the services after the tasks are completed. This 
ensures that there are no
    concurrent accesses to the checkpoint services.
    
    This closes #15251.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  86 ++++++------
 .../runtime/scheduler/adaptive/BackgroundTask.java | 130 +++++++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  15 +-
 .../scheduler/adaptive/BackgroundTaskTest.java     | 156 +++++++++++++++++++++
 4 files changed, 339 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 4615175..162c29b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -111,7 +111,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -186,6 +185,8 @@ public class AdaptiveScheduler
     private final MutableVertexAttemptNumberStore vertexAttemptNumberStore =
             new DefaultVertexAttemptNumberStore();
 
+    private BackgroundTask<ExecutionGraph> backgroundTask = 
BackgroundTask.finishedBackgroundTask();
+
     public AdaptiveScheduler(
             JobGraph jobGraph,
             Configuration configuration,
@@ -283,9 +284,42 @@ public class AdaptiveScheduler
 
     @Override
     public CompletableFuture<Void> closeAsync() {
+        LOG.debug("Closing the AdaptiveScheduler. Trying to suspend the 
current job execution.");
+
         state.suspend(new FlinkException("AdaptiveScheduler is being 
stopped."));
 
-        return FutureUtils.completedVoidFuture();
+        Preconditions.checkState(
+                state instanceof Finished,
+                "Scheduler state should be finished after calling 
state.suspend.");
+
+        backgroundTask.abort();
+        // wait for the background task to finish and then close services
+        return FutureUtils.runAfterwardsAsync(
+                backgroundTask.getTerminationFuture(),
+                () -> stopCheckpointServicesSafely(jobTerminationFuture.get()),
+                getMainThreadExecutor());
+    }
+
+    private void stopCheckpointServicesSafely(JobStatus terminalState) {
+        LOG.debug("Stopping the checkpoint services with state {}.", 
terminalState);
+
+        Exception exception = null;
+
+        try {
+            completedCheckpointStore.shutdown(terminalState, 
checkpointsCleaner);
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        try {
+            checkpointIdCounter.shutdown(terminalState);
+        } catch (Exception e) {
+            exception = ExceptionUtils.firstOrSuppressed(e, exception);
+        }
+
+        if (exception != null) {
+            LOG.warn("Failed to stop checkpoint services.", exception);
+        }
     }
 
     @Override
@@ -746,24 +780,14 @@ public class AdaptiveScheduler
 
     private CompletableFuture<ExecutionGraph> 
createExecutionGraphAndRestoreStateAsync(
             JobGraph adjustedJobGraph) {
-        return CompletableFuture.supplyAsync(
-                        () -> {
-                            try {
-                                return 
createExecutionGraphAndRestoreState(adjustedJobGraph);
-                            } catch (Exception exception) {
-                                throw new CompletionException(exception);
-                            }
-                        },
-                        ioExecutor)
-                .handleAsync(
-                        (executionGraph, throwable) -> {
-                            if (throwable != null) {
-                                throw new CompletionException(throwable);
-                            } else {
-                                return executionGraph;
-                            }
-                        },
-                        getMainThreadExecutor());
+        backgroundTask.abort();
+
+        backgroundTask =
+                backgroundTask.runAfter(
+                        () -> 
createExecutionGraphAndRestoreState(adjustedJobGraph), ioExecutor);
+
+        return FutureUtils.switchExecutor(
+                backgroundTask.getResultFuture(), getMainThreadExecutor());
     }
 
     @Nonnull
@@ -819,8 +843,6 @@ public class AdaptiveScheduler
 
     @Override
     public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
-        stopCheckpointServicesSafely(archivedExecutionGraph.getState());
-
         if (jobStatusListener != null) {
             jobStatusListener.jobStatusChanges(
                     jobInformation.getJobID(),
@@ -834,26 +856,6 @@ public class AdaptiveScheduler
         jobTerminationFuture.complete(archivedExecutionGraph.getState());
     }
 
-    private void stopCheckpointServicesSafely(JobStatus terminalState) {
-        Exception exception = null;
-
-        try {
-            completedCheckpointStore.shutdown(terminalState, 
checkpointsCleaner);
-        } catch (Exception e) {
-            exception = e;
-        }
-
-        try {
-            checkpointIdCounter.shutdown(terminalState);
-        } catch (Exception e) {
-            exception = ExceptionUtils.firstOrSuppressed(e, exception);
-        }
-
-        if (exception != null) {
-            LOG.warn("Failed to stop checkpoint services.", exception);
-        }
-    }
-
     @Override
     public Executing.FailureResult howToHandleFailure(Throwable failure) {
         if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java
new file mode 100644
index 0000000..7452c07
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * BackgroundTask encapsulates an asynchronous operation which can produce a 
result. The result can
+ * be accessed via {@link BackgroundTask#getResultFuture()}. Additionally, the 
task allows to track
+ * its completion via {@link BackgroundTask#getTerminationFuture()}.
+ *
+ * <p>In order to ensure the order of background tasks, one can use the {@link
+ * BackgroundTask#runAfter} to schedule tasks which are executed after this 
task has completed.
+ * Tasks which are executed sequentially like this won't be affected by the 
outcome of previous
+ * tasks. This means that a failed task won't stop succeeding tasks from being 
executed.
+ *
+ * @param <T> type of the produced result
+ */
+final class BackgroundTask<T> {
+    private final CompletableFuture<Void> terminationFuture;
+
+    private final CompletableFuture<T> resultFuture;
+
+    private volatile boolean isAborted = false;
+
+    private BackgroundTask(
+            CompletableFuture<Void> previousTerminationFuture,
+            SupplierWithException<? extends T, ? extends Exception> task,
+            Executor executor) {
+        resultFuture =
+                previousTerminationFuture.thenApplyAsync(
+                        ignored -> {
+                            if (!isAborted) {
+                                try {
+                                    return task.get();
+                                } catch (Exception exception) {
+                                    throw new CompletionException(exception);
+                                }
+                            } else {
+                                throw new CompletionException(
+                                        new FlinkException("Background task 
has been aborted."));
+                            }
+                        },
+                        executor);
+
+        terminationFuture = resultFuture.handle((ignored, ignoredThrowable) -> 
null);
+    }
+
+    private BackgroundTask() {
+        terminationFuture = FutureUtils.completedVoidFuture();
+        resultFuture =
+                FutureUtils.completedExceptionally(
+                        new FlinkException(
+                                "No result has been created because it is a 
finished background task."));
+    }
+
+    /**
+     * Abort the execution of this background task. This method has only an 
effect if the background
+     * task has not been started yet.
+     */
+    void abort() {
+        isAborted = true;
+    }
+
+    public CompletableFuture<T> getResultFuture() {
+        return resultFuture;
+    }
+
+    CompletableFuture<Void> getTerminationFuture() {
+        return terminationFuture;
+    }
+
+    /**
+     * Runs the given task after this background task has completed (normally 
or exceptionally).
+     *
+     * @param task task to run after this background task has completed
+     * @param executor executor to run the task
+     * @param <V> type of the result
+     * @return new {@link BackgroundTask} representing the new task to execute
+     */
+    <V> BackgroundTask<V> runAfter(
+            SupplierWithException<? extends V, ? extends Exception> task, 
Executor executor) {
+        return new BackgroundTask<>(terminationFuture, task, executor);
+    }
+
+    /**
+     * Creates a finished background task which can be used as the start of a 
background task chain.
+     *
+     * @param <V> type of the background task
+     * @return A finished background task
+     */
+    static <V> BackgroundTask<V> finishedBackgroundTask() {
+        return new BackgroundTask<>();
+    }
+
+    /**
+     * Creates an initial background task. This means that this background 
task has no predecessor.
+     *
+     * @param task task to run
+     * @param executor executor to run the task
+     * @param <V> type of the result
+     * @return initial {@link BackgroundTask} representing the task to execute
+     */
+    static <V> BackgroundTask<V> initialBackgroundTask(
+            SupplierWithException<? extends V, ? extends Exception> task, 
Executor executor) {
+        return new BackgroundTask<>(FutureUtils.completedVoidFuture(), task, 
executor);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 9bbf7a5..af7f00d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -568,7 +568,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testGoToFinishedShutsDownCheckpointingComponents() throws 
Exception {
+    public void testCloseShutsDownCheckpointingComponents() throws Exception {
         final CompletableFuture<JobStatus> 
completedCheckpointStoreShutdownFuture =
                 new CompletableFuture<>();
         final CompletedCheckpointStore completedCheckpointStore =
@@ -586,16 +586,19 @@ public class AdaptiveSchedulerTest extends TestLogger {
                         CheckpointCoordinatorConfiguration.builder().build(), 
null));
 
         final AdaptiveScheduler scheduler =
-                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
                         .setCheckpointRecoveryFactory(
                                 new TestingCheckpointRecoveryFactory(
                                         completedCheckpointStore, 
checkpointIdCounter))
                         .build();
 
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
-
-        scheduler.goToFinished(archivedExecutionGraph);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    // transition into the FAILED state
+                    scheduler.handleGlobalFailure(new FlinkException("Test 
exception"));
+                    scheduler.closeAsync();
+                });
 
         assertThat(completedCheckpointStoreShutdownFuture.get(), 
is(JobStatus.FAILED));
         assertThat(checkpointIdCounterShutdownFuture.get(), 
is(JobStatus.FAILED));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java
new file mode 100644
index 0000000..d224ff6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link BackgroundTask}. */
+public class BackgroundTaskTest extends TestLogger {
+
+    @ClassRule
+    public static final TestExecutorResource<ExecutorService> 
TEST_EXECUTOR_RESOURCE =
+            new TestExecutorResource<>(() -> Executors.newFixedThreadPool(2));
+
+    @Test
+    public void testFinishedBackgroundTaskIsTerminated() {
+        final BackgroundTask<Void> finishedBackgroundTask = 
BackgroundTask.finishedBackgroundTask();
+
+        assertTrue(finishedBackgroundTask.getTerminationFuture().isDone());
+        finishedBackgroundTask.getTerminationFuture().join();
+    }
+
+    @Test
+    public void testFinishedBackgroundTaskDoesNotContainAResult() {
+        final BackgroundTask<Void> finishedBackgroundTask = 
BackgroundTask.finishedBackgroundTask();
+
+        
assertTrue(finishedBackgroundTask.getResultFuture().isCompletedExceptionally());
+    }
+
+    @Test
+    public void testNormalCompletionOfBackgroundTask() {
+        final String expectedValue = "foobar";
+        final BackgroundTask<String> backgroundTask =
+                BackgroundTask.finishedBackgroundTask()
+                        .runAfter(() -> expectedValue, 
TEST_EXECUTOR_RESOURCE.getExecutor());
+
+        assertThat(backgroundTask.getResultFuture().join(), 
Matchers.is(expectedValue));
+        // check that the termination future has completed normally
+        backgroundTask.getTerminationFuture().join();
+    }
+
+    @Test
+    public void testExceptionalCompletionOfBackgroundTask() throws 
InterruptedException {
+        final Exception expectedException = new Exception("Test exception");
+        final BackgroundTask<String> backgroundTask =
+                BackgroundTask.finishedBackgroundTask()
+                        .runAfter(
+                                () -> {
+                                    throw expectedException;
+                                },
+                                TEST_EXECUTOR_RESOURCE.getExecutor());
+
+        try {
+            backgroundTask.getResultFuture().get();
+            fail("Expected an exceptionally completed result future.");
+        } catch (ExecutionException ee) {
+            assertThat(ee, FlinkMatchers.containsCause(expectedException));
+        }
+        // check that the termination future has completed normally
+        backgroundTask.getTerminationFuture().join();
+    }
+
+    @Test
+    public void testRunAfterExecutesBackgroundTaskAfterPreviousHasCompleted() {
+        final OneShotLatch blockingLatch = new OneShotLatch();
+        final BlockingQueue<Integer> taskCompletions = new 
ArrayBlockingQueue<>(2);
+        final BackgroundTask<Void> backgroundTask =
+                BackgroundTask.initialBackgroundTask(
+                                () -> {
+                                    blockingLatch.await();
+                                    taskCompletions.offer(1);
+                                    return null;
+                                },
+                                TEST_EXECUTOR_RESOURCE.getExecutor())
+                        .runAfter(
+                                () -> {
+                                    taskCompletions.offer(2);
+                                    return null;
+                                },
+                                TEST_EXECUTOR_RESOURCE.getExecutor());
+
+        blockingLatch.trigger();
+
+        backgroundTask.getTerminationFuture().join();
+
+        assertThat(taskCompletions, Matchers.contains(1, 2));
+    }
+
+    @Test
+    public void testAbortSkipsTasksWhichHaveNotBeenStarted() {
+        final OneShotLatch blockingLatch = new OneShotLatch();
+        final BlockingQueue<Integer> taskCompletions = new 
ArrayBlockingQueue<>(2);
+        final BackgroundTask<Void> backgroundTask =
+                BackgroundTask.initialBackgroundTask(
+                                () -> {
+                                    blockingLatch.await();
+                                    taskCompletions.offer(1);
+                                    return null;
+                                },
+                                TEST_EXECUTOR_RESOURCE.getExecutor())
+                        .runAfter(
+                                () -> {
+                                    taskCompletions.offer(2);
+                                    return null;
+                                },
+                                TEST_EXECUTOR_RESOURCE.getExecutor());
+
+        final BackgroundTask<Void> finalTask =
+                backgroundTask.runAfter(
+                        () -> {
+                            taskCompletions.offer(3);
+                            return null;
+                        },
+                        TEST_EXECUTOR_RESOURCE.getExecutor());
+
+        backgroundTask.abort();
+
+        blockingLatch.trigger();
+
+        finalTask.getTerminationFuture().join();
+
+        assertThat(taskCompletions, Matchers.contains(1, 3));
+    }
+}

Reply via email to