zentol commented on a change in pull request #15251: URL: https://github.com/apache/flink/pull/15251#discussion_r595951751
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +/** + * State which waits for the creation of the {@link ExecutionGraph}. If the creation fails, then the + * state transitions to {@link Finished}. If the creation succeeds, then the system tries to assign + * the required slots. If the set of available slots has changed so that the created {@link + * ExecutionGraph} cannot be executed, the state transitions back into {@link WaitingForResources}. + * If there are enough slots for the {@link ExecutionGraph} to run, the state transitions to {@link + * Executing}. + */ +public class CreatingExecutionGraph implements State { + + private final Context context; + + private final Logger log; + + public CreatingExecutionGraph( + Context context, CompletableFuture<ExecutionGraph> executionGraphFuture, Logger log) { + this.context = context; + this.log = log; + + FutureUtils.assertNoException( + executionGraphFuture.handle( + (executionGraph, throwable) -> { + context.runIfState( + this, + () -> handleExecutionGraphCreation(executionGraph, throwable), + Duration.ZERO); + return null; + })); + } + + private void handleExecutionGraphCreation( + @Nullable ExecutionGraph executionGraph, @Nullable Throwable throwable) { + if (throwable != null) { + log.info( + "Failed to go from {} to {} because the ExecutionGraph creation failed.", + CreatingExecutionGraph.class.getSimpleName(), + Executing.class.getSimpleName(), + throwable); + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable)); + } else { + final AssignmentResult result = context.tryToAssignSlots(executionGraph); + + if (result.isSuccess()) { + context.goToExecuting(result.getExecutionGraph()); + } else { + context.goToWaitingForResources(); + } + } + } + + @Override + public void cancel() { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); + } + + @Override + public void suspend(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); + } + + @Override + public JobStatus getJobStatus() { + return JobStatus.INITIALIZING; + } + + @Override + public ArchivedExecutionGraph getJob() { + return context.getArchivedExecutionGraph(getJobStatus(), null); + } + + @Override + public void handleGlobalFailure(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); + } + + @Override + public Logger getLogger() { + return log; + } + + /** Context for the {@link CreatingExecutionGraph} state. */ + interface Context { + + /** + * Transitions into the {@link Finished} state. + * + * @param archivedExecutionGraph archivedExecutionGraph representing the final job state + */ + void goToFinished(ArchivedExecutionGraph archivedExecutionGraph); + + /** + * Transitions into the {@link Executing} state. + * + * @param executionGraph executionGraph which is passed to the {@link Executing} state + */ + void goToExecuting(ExecutionGraph executionGraph); + + /** Transitions into the {@link WaitingForResources} state. */ + void goToWaitingForResources(); + + /** + * Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can + * be null if there is no failure. + * + * @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with + * @param cause cause describing a failure cause; {@code null} if there is none + * @return the created {@link ArchivedExecutionGraph} + */ + ArchivedExecutionGraph getArchivedExecutionGraph( + JobStatus jobStatus, @Nullable Throwable cause); + + /** + * Runs the given action after a delay if the state at this time equals the expected state. + * + * @param expectedState expectedState describes the required state at the time of running + * the action + * @param action action to run if the expected state equals the actual state + * @param delay delay after which to run the action + */ + void runIfState(State expectedState, Runnable action, Duration delay); + + /** + * Try to assign slots to the created {@link ExecutionGraph}. If it is possible, then this + * method returns a successful {@link AssignmentResult} which contains the assigned {@link + * ExecutionGraph}. If not, then the assignment result is a failure. + * + * @param executionGraph executionGraph to assign slots to + * @return {@link AssignmentResult} representing the result of the assignment + */ + AssignmentResult tryToAssignSlots(ExecutionGraph executionGraph); + } + + /** + * Class representing the assignment result of the slots to the {@link ExecutionGraph}. The + * assignment is either successful or not possible. If it is successful, the assignment also + * contains the assigned {@link ExecutionGraph}. + */ + static final class AssignmentResult { + + private static final AssignmentResult NOT_POSSIBLE = new AssignmentResult(null); + + @Nullable private final ExecutionGraph executionGraph; + + private AssignmentResult(@Nullable ExecutionGraph executionGraph) { + this.executionGraph = executionGraph; + } + + boolean isSuccess() { + return executionGraph != null; + } + + ExecutionGraph getExecutionGraph() { + Preconditions.checkState( + isSuccess(), "Can only return the ExecutionGraph if it is a success."); + return executionGraph; + } + + static AssignmentResult success(ExecutionGraph executionGraph) { + return new AssignmentResult(executionGraph); Review comment: checkNotNull ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for the {@link BackgroundTask}. */ +public class BackgroundTaskTest extends TestLogger { Review comment: We should also add a test that aborting a future that does affect subsequent BackgroundTasks. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java ########## @@ -722,25 +727,50 @@ public void testRestoringModifiedJobFromSavepointFails() throws Exception { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID()); - final AdaptiveScheduler adaptiveScheduler = - new AdaptiveSchedulerBuilder(jobGraphWithNewOperator, mainThreadExecutor) - .setDeclarativeSlotPool(declarativeSlotPool) - .build(); - - adaptiveScheduler.startScheduling(); - - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1))); - - final ArchivedExecutionGraph archivedExecutionGraph = - adaptiveScheduler.requestJob().getArchivedExecutionGraph(); - - assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); - assertThat( - archivedExecutionGraph.getFailureInfo().getException(), - FlinkMatchers.containsMessage("Failed to rollback to checkpoint/savepoint")); + final GloballyTerminalJobStatusListener jobStatusListener = + new GloballyTerminalJobStatusListener(); + + final ScheduledExecutorService singleThreadExecutor = + Executors.newSingleThreadScheduledExecutor(); + + try { + final ComponentMainThreadExecutor singleMainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( Review comment: Could we make our life easier by adding an ExecutionGraphFactory to the AdaptiveScheduler, that during tests builds the EG in a blocking fashion? I'm not looking forward to having to deal with separate main thread executors again... 😠########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java ########## @@ -148,7 +149,17 @@ public ResourceCounter calculateRequiredSlots( @Override public Map<ExecutionVertexID, LogicalSlot> reserveResources( - VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing) { + VertexParallelism vertexParallelism) { + Preconditions.checkArgument( + vertexParallelism instanceof VertexParallelismWithSlotSharing, Review comment: An alternative could be to add a `assignResources()` method to the `VertexParallelism`, and store a reference to the SlotAllocator in the `VertexParallelismWithSlotSharing`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ########## @@ -1344,4 +1344,26 @@ public static void throwIfCompletedExceptionally(CompletableFuture<?> future) th } }; } + + /** + * Switches the execution context of the given source future. This works for normally and + * exceptionally completed futures. + * + * @param source source to switch the execution context for + * @param executor executor representing the new execution context + * @param <T> type of the source + * @return future which is executed by the given executor + */ + public static <T> CompletableFuture<T> switchExecutor( Review comment: _neat_ ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java ########## @@ -713,58 +698,97 @@ public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { @Override public void goToCreatingExecutionGraph() { - CompletableFuture<ExecutionGraph> executionGraphFuture; + final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> + executionGraphWithAvailableResourcesFuture = + createExecutionGraphWithAvailableResourcesAsync(); + + transitionToState( + new CreatingExecutionGraph.Factory( + this, executionGraphWithAvailableResourcesFuture, LOG)); + } + + private CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> + createExecutionGraphWithAvailableResourcesAsync() { + final JobGraph adjustedJobGraph; + final VertexParallelism vertexParallelism; try { - final ExecutionGraph executionGraph = createExecutionGraphWithAvailableResources(); - executionGraphFuture = CompletableFuture.completedFuture(executionGraph); + vertexParallelism = determineParallelism(slotAllocator); + + adjustedJobGraph = jobInformation.copyJobGraph(); + for (JobVertex vertex : adjustedJobGraph.getVertices()) { + vertex.setParallelism(vertexParallelism.getParallelism(vertex.getID())); + } } catch (Exception exception) { - executionGraphFuture = FutureUtils.completedExceptionally(exception); + return FutureUtils.completedExceptionally(exception); } - transitionToState(new CreatingExecutionGraph.Factory(this, executionGraphFuture, LOG)); + return createExecutionGraphAndRestoreStateAsync(adjustedJobGraph) + .thenApply( + executionGraph -> + CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( + executionGraph, vertexParallelism)); } - ExecutionGraph createExecutionGraphWithAvailableResources() throws Exception { - final ParallelismAndResourceAssignments parallelismAndResourceAssignments = - determineParallelismAndAssignResources(slotAllocator); - - JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); - for (JobVertex vertex : adjustedJobGraph.getVertices()) { - vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID())); - } - + @Override + public CreatingExecutionGraph.AssignmentResult tryToAssignSlots( + CreatingExecutionGraph.ExecutionGraphWithVertexParallelism + executionGraphWithVertexParallelism) { final ExecutionGraph executionGraph = - createExecutionGraphAndRestoreStateAsync(adjustedJobGraph).join(); + executionGraphWithVertexParallelism.getExecutionGraph(); executionGraph.start(componentMainThreadExecutor); executionGraph.transitionToRunning(); executionGraph.setInternalTaskFailuresListener( new UpdateSchedulerNgOnInternalFailuresListener(this)); + final VertexParallelism vertexParallelism = + executionGraphWithVertexParallelism.getVertexParallelism(); + return slotAllocator + .tryReserveResources(vertexParallelism) + .map( + reservedSlots -> + CreatingExecutionGraph.AssignmentResult.success( + assignSlotsToExecutionGraph(executionGraph, reservedSlots))) + .orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible); + } + + @Nonnull + private ExecutionGraph assignSlotsToExecutionGraph( + ExecutionGraph executionGraph, + org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots assignedSlots) { Review comment: odd import ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/BackgroundTask.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.SupplierWithException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * BackgroundTask encapsulates an asynchronous operation which can produce a result. The result can + * be accessed via {@link BackgroundTask#getResultFuture()}. Additionally, the task allows to track + * its completion via {@link BackgroundTask#getTerminationFuture()}. + * + * <p>In order to ensure the order of background tasks, one can use the {@link + * BackgroundTask#runAfter} to schedule tasks which are executed after this task has completed. + * + * @param <T> type of the produced result + */ +final class BackgroundTask<T> { + private final CompletableFuture<Void> terminationFuture; + + private final CompletableFuture<T> resultFuture; + + private volatile boolean isValid = true; Review comment: "valid" feels like an odd naming choice; instinctively I would have expected "isAborted" (+ negating conditions of course) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java ########## @@ -45,27 +42,26 @@ * <p>If a {@link VertexParallelism} is returned then it covers all vertices contained in the * given job information. * - * <p>A returned {@link VertexParallelism} should be directly consumed afterwards (by either - * discarding it or calling {@link #reserveResources(VertexParallelism)}, as there is no - * guarantee that the assignment remains valid over time (because slots can be lost). - * * <p>Implementations of this method must be side-effect free. There is no guarantee that the - * result of this method is ever passed to {@link #reserveResources(VertexParallelism)}. + * result of this method is ever passed to {@link #tryReserveResources(VertexParallelism)}. * * @param jobInformation information about the job graph * @param slots slots to consider for determining the parallelism * @return potential parallelism for all vertices and implementation-specific information for * how the vertices could be assigned to slots, if all vertices could be run with the given * slots */ - Optional<T> determineParallelism( + Optional<? extends VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> slots); /** - * Reserves slots according to the given assignment. + * Reserves slots according to the given assignment if possible. If the underlying set of + * resources has changed and the reservation with respect to vertexParallelism is no longer + * possible, then this method returns {@link Optional#empty()}. * * @param vertexParallelism information on how slots should be assigned to the slots - * @return mapping of vertices to slots + * @return Set of reserved slots if the reservation was successful; otherwise {@link + * Optional#empty()} */ - Map<ExecutionVertexID, LogicalSlot> reserveResources(T vertexParallelism); + Optional<? extends ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism); Review comment: Why isn't this just returning `ReservedSlots` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
