zentol commented on a change in pull request #15251:
URL: https://github.com/apache/flink/pull/15251#discussion_r595951751



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * State which waits for the creation of the {@link ExecutionGraph}. If the 
creation fails, then the
+ * state transitions to {@link Finished}. If the creation succeeds, then the 
system tries to assign
+ * the required slots. If the set of available slots has changed so that the 
created {@link
+ * ExecutionGraph} cannot be executed, the state transitions back into {@link 
WaitingForResources}.
+ * If there are enough slots for the {@link ExecutionGraph} to run, the state 
transitions to {@link
+ * Executing}.
+ */
+public class CreatingExecutionGraph implements State {
+
+    private final Context context;
+
+    private final Logger log;
+
+    public CreatingExecutionGraph(
+            Context context, CompletableFuture<ExecutionGraph> 
executionGraphFuture, Logger log) {
+        this.context = context;
+        this.log = log;
+
+        FutureUtils.assertNoException(
+                executionGraphFuture.handle(
+                        (executionGraph, throwable) -> {
+                            context.runIfState(
+                                    this,
+                                    () -> 
handleExecutionGraphCreation(executionGraph, throwable),
+                                    Duration.ZERO);
+                            return null;
+                        }));
+    }
+
+    private void handleExecutionGraphCreation(
+            @Nullable ExecutionGraph executionGraph, @Nullable Throwable 
throwable) {
+        if (throwable != null) {
+            log.info(
+                    "Failed to go from {} to {} because the ExecutionGraph 
creation failed.",
+                    CreatingExecutionGraph.class.getSimpleName(),
+                    Executing.class.getSimpleName(),
+                    throwable);
+            
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
throwable));
+        } else {
+            final AssignmentResult result = 
context.tryToAssignSlots(executionGraph);
+
+            if (result.isSuccess()) {
+                context.goToExecuting(result.getExecutionGraph());
+            } else {
+                context.goToWaitingForResources();
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, 
null));
+    }
+
+    @Override
+    public void suspend(Throwable cause) {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, 
cause));
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        return JobStatus.INITIALIZING;
+    }
+
+    @Override
+    public ArchivedExecutionGraph getJob() {
+        return context.getArchivedExecutionGraph(getJobStatus(), null);
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
cause));
+    }
+
+    @Override
+    public Logger getLogger() {
+        return log;
+    }
+
+    /** Context for the {@link CreatingExecutionGraph} state. */
+    interface Context {
+
+        /**
+         * Transitions into the {@link Finished} state.
+         *
+         * @param archivedExecutionGraph archivedExecutionGraph representing 
the final job state
+         */
+        void goToFinished(ArchivedExecutionGraph archivedExecutionGraph);
+
+        /**
+         * Transitions into the {@link Executing} state.
+         *
+         * @param executionGraph executionGraph which is passed to the {@link 
Executing} state
+         */
+        void goToExecuting(ExecutionGraph executionGraph);
+
+        /** Transitions into the {@link WaitingForResources} state. */
+        void goToWaitingForResources();
+
+        /**
+         * Creates the {@link ArchivedExecutionGraph} for the given job status 
and cause. Cause can
+         * be null if there is no failure.
+         *
+         * @param jobStatus jobStatus to initialize the {@link 
ArchivedExecutionGraph} with
+         * @param cause cause describing a failure cause; {@code null} if 
there is none
+         * @return the created {@link ArchivedExecutionGraph}
+         */
+        ArchivedExecutionGraph getArchivedExecutionGraph(
+                JobStatus jobStatus, @Nullable Throwable cause);
+
+        /**
+         * Runs the given action after a delay if the state at this time 
equals the expected state.
+         *
+         * @param expectedState expectedState describes the required state at 
the time of running
+         *     the action
+         * @param action action to run if the expected state equals the actual 
state
+         * @param delay delay after which to run the action
+         */
+        void runIfState(State expectedState, Runnable action, Duration delay);
+
+        /**
+         * Try to assign slots to the created {@link ExecutionGraph}. If it is 
possible, then this
+         * method returns a successful {@link AssignmentResult} which contains 
the assigned {@link
+         * ExecutionGraph}. If not, then the assignment result is a failure.
+         *
+         * @param executionGraph executionGraph to assign slots to
+         * @return {@link AssignmentResult} representing the result of the 
assignment
+         */
+        AssignmentResult tryToAssignSlots(ExecutionGraph executionGraph);
+    }
+
+    /**
+     * Class representing the assignment result of the slots to the {@link 
ExecutionGraph}. The
+     * assignment is either successful or not possible. If it is successful, 
the assignment also
+     * contains the assigned {@link ExecutionGraph}.
+     */
+    static final class AssignmentResult {
+
+        private static final AssignmentResult NOT_POSSIBLE = new 
AssignmentResult(null);
+
+        @Nullable private final ExecutionGraph executionGraph;
+
+        private AssignmentResult(@Nullable ExecutionGraph executionGraph) {
+            this.executionGraph = executionGraph;
+        }
+
+        boolean isSuccess() {
+            return executionGraph != null;
+        }
+
+        ExecutionGraph getExecutionGraph() {
+            Preconditions.checkState(
+                    isSuccess(), "Can only return the ExecutionGraph if it is 
a success.");
+            return executionGraph;
+        }
+
+        static AssignmentResult success(ExecutionGraph executionGraph) {
+            return new AssignmentResult(executionGraph);

Review comment:
       checkNotNull

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link BackgroundTask}. */
+public class BackgroundTaskTest extends TestLogger {

Review comment:
       We should also add a test that aborting a future that does affect 
subsequent BackgroundTasks.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -722,25 +727,50 @@ public void testRestoringModifiedJobFromSavepointFails() 
throws Exception {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
 
-        final AdaptiveScheduler adaptiveScheduler =
-                new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, 
mainThreadExecutor)
-                        .setDeclarativeSlotPool(declarativeSlotPool)
-                        .build();
-
-        adaptiveScheduler.startScheduling();
-
-        offerSlots(
-                declarativeSlotPool,
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
1)));
-
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                adaptiveScheduler.requestJob().getArchivedExecutionGraph();
-
-        assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
-        assertThat(
-                archivedExecutionGraph.getFailureInfo().getException(),
-                FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
+        final GloballyTerminalJobStatusListener jobStatusListener =
+                new GloballyTerminalJobStatusListener();
+
+        final ScheduledExecutorService singleThreadExecutor =
+                Executors.newSingleThreadScheduledExecutor();
+
+        try {
+            final ComponentMainThreadExecutor singleMainThreadExecutor =
+                    
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(

Review comment:
       Could we make our life easier by adding an ExecutionGraphFactory to the 
AdaptiveScheduler, that during tests builds the EG in a blocking fashion?
   I'm not looking forward to having to deal with separate main thread 
executors again... 😭 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
##########
@@ -148,7 +149,17 @@ public ResourceCounter calculateRequiredSlots(
 
     @Override
     public Map<ExecutionVertexID, LogicalSlot> reserveResources(
-            VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing) 
{
+            VertexParallelism vertexParallelism) {
+        Preconditions.checkArgument(
+                vertexParallelism instanceof VertexParallelismWithSlotSharing,

Review comment:
       An alternative could be to add a `assignResources()` method to the 
`VertexParallelism`, and store a reference to the SlotAllocator in the 
`VertexParallelismWithSlotSharing`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -1344,4 +1344,26 @@ public static void 
throwIfCompletedExceptionally(CompletableFuture<?> future) th
             }
         };
     }
+
+    /**
+     * Switches the execution context of the given source future. This works 
for normally and
+     * exceptionally completed futures.
+     *
+     * @param source source to switch the execution context for
+     * @param executor executor representing the new execution context
+     * @param <T> type of the source
+     * @return future which is executed by the given executor
+     */
+    public static <T> CompletableFuture<T> switchExecutor(

Review comment:
       _neat_

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -713,58 +698,97 @@ public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
 
     @Override
     public void goToCreatingExecutionGraph() {
-        CompletableFuture<ExecutionGraph> executionGraphFuture;
+        final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                executionGraphWithAvailableResourcesFuture =
+                        createExecutionGraphWithAvailableResourcesAsync();
+
+        transitionToState(
+                new CreatingExecutionGraph.Factory(
+                        this, executionGraphWithAvailableResourcesFuture, 
LOG));
+    }
+
+    private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+            createExecutionGraphWithAvailableResourcesAsync() {
+        final JobGraph adjustedJobGraph;
+        final VertexParallelism vertexParallelism;
 
         try {
-            final ExecutionGraph executionGraph = 
createExecutionGraphWithAvailableResources();
-            executionGraphFuture = 
CompletableFuture.completedFuture(executionGraph);
+            vertexParallelism = determineParallelism(slotAllocator);
+
+            adjustedJobGraph = jobInformation.copyJobGraph();
+            for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+                
vertex.setParallelism(vertexParallelism.getParallelism(vertex.getID()));
+            }
         } catch (Exception exception) {
-            executionGraphFuture = 
FutureUtils.completedExceptionally(exception);
+            return FutureUtils.completedExceptionally(exception);
         }
 
-        transitionToState(new CreatingExecutionGraph.Factory(this, 
executionGraphFuture, LOG));
+        return createExecutionGraphAndRestoreStateAsync(adjustedJobGraph)
+                .thenApply(
+                        executionGraph ->
+                                
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                                        executionGraph, vertexParallelism));
     }
 
-    ExecutionGraph createExecutionGraphWithAvailableResources() throws 
Exception {
-        final ParallelismAndResourceAssignments 
parallelismAndResourceAssignments =
-                determineParallelismAndAssignResources(slotAllocator);
-
-        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
-        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
-            
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
-        }
-
+    @Override
+    public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(
+            CreatingExecutionGraph.ExecutionGraphWithVertexParallelism
+                    executionGraphWithVertexParallelism) {
         final ExecutionGraph executionGraph =
-                
createExecutionGraphAndRestoreStateAsync(adjustedJobGraph).join();
+                executionGraphWithVertexParallelism.getExecutionGraph();
 
         executionGraph.start(componentMainThreadExecutor);
         executionGraph.transitionToRunning();
 
         executionGraph.setInternalTaskFailuresListener(
                 new UpdateSchedulerNgOnInternalFailuresListener(this));
 
+        final VertexParallelism vertexParallelism =
+                executionGraphWithVertexParallelism.getVertexParallelism();
+        return slotAllocator
+                .tryReserveResources(vertexParallelism)
+                .map(
+                        reservedSlots ->
+                                
CreatingExecutionGraph.AssignmentResult.success(
+                                        
assignSlotsToExecutionGraph(executionGraph, reservedSlots)))
+                
.orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible);
+    }
+
+    @Nonnull
+    private ExecutionGraph assignSlotsToExecutionGraph(
+            ExecutionGraph executionGraph,
+            
org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots 
assignedSlots) {

Review comment:
       odd import

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * BackgroundTask encapsulates an asynchronous operation which can produce a 
result. The result can
+ * be accessed via {@link BackgroundTask#getResultFuture()}. Additionally, the 
task allows to track
+ * its completion via {@link BackgroundTask#getTerminationFuture()}.
+ *
+ * <p>In order to ensure the order of background tasks, one can use the {@link
+ * BackgroundTask#runAfter} to schedule tasks which are executed after this 
task has completed.
+ *
+ * @param <T> type of the produced result
+ */
+final class BackgroundTask<T> {
+    private final CompletableFuture<Void> terminationFuture;
+
+    private final CompletableFuture<T> resultFuture;
+
+    private volatile boolean isValid = true;

Review comment:
       "valid" feels like an odd naming choice; instinctively I would have 
expected "isAborted" (+ negating conditions of course)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
##########
@@ -45,27 +42,26 @@
      * <p>If a {@link VertexParallelism} is returned then it covers all 
vertices contained in the
      * given job information.
      *
-     * <p>A returned {@link VertexParallelism} should be directly consumed 
afterwards (by either
-     * discarding it or calling {@link #reserveResources(VertexParallelism)}, 
as there is no
-     * guarantee that the assignment remains valid over time (because slots 
can be lost).
-     *
      * <p>Implementations of this method must be side-effect free. There is no 
guarantee that the
-     * result of this method is ever passed to {@link 
#reserveResources(VertexParallelism)}.
+     * result of this method is ever passed to {@link 
#tryReserveResources(VertexParallelism)}.
      *
      * @param jobInformation information about the job graph
      * @param slots slots to consider for determining the parallelism
      * @return potential parallelism for all vertices and 
implementation-specific information for
      *     how the vertices could be assigned to slots, if all vertices could 
be run with the given
      *     slots
      */
-    Optional<T> determineParallelism(
+    Optional<? extends VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
 
     /**
-     * Reserves slots according to the given assignment.
+     * Reserves slots according to the given assignment if possible. If the 
underlying set of
+     * resources has changed and the reservation with respect to 
vertexParallelism is no longer
+     * possible, then this method returns {@link Optional#empty()}.
      *
      * @param vertexParallelism information on how slots should be assigned to 
the slots
-     * @return mapping of vertices to slots
+     * @return Set of reserved slots if the reservation was successful; 
otherwise {@link
+     *     Optional#empty()}
      */
-    Map<ExecutionVertexID, LogicalSlot> reserveResources(T vertexParallelism);
+    Optional<? extends ReservedSlots> tryReserveResources(VertexParallelism 
vertexParallelism);

Review comment:
       Why isn't this just returning `ReservedSlots`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to