tillrohrmann commented on a change in pull request #15251: URL: https://github.com/apache/flink/pull/15251#discussion_r597595486
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +/** + * State which waits for the creation of the {@link ExecutionGraph}. If the creation fails, then the + * state transitions to {@link Finished}. If the creation succeeds, then the system tries to assign + * the required slots. If the set of available slots has changed so that the created {@link + * ExecutionGraph} cannot be executed, the state transitions back into {@link WaitingForResources}. + * If there are enough slots for the {@link ExecutionGraph} to run, the state transitions to {@link + * Executing}. + */ +public class CreatingExecutionGraph implements State { + + private final Context context; + + private final Logger log; + + public CreatingExecutionGraph( + Context context, CompletableFuture<ExecutionGraph> executionGraphFuture, Logger log) { + this.context = context; + this.log = log; + + FutureUtils.assertNoException( + executionGraphFuture.handle( + (executionGraph, throwable) -> { + context.runIfState( + this, + () -> handleExecutionGraphCreation(executionGraph, throwable), + Duration.ZERO); + return null; + })); + } + + private void handleExecutionGraphCreation( + @Nullable ExecutionGraph executionGraph, @Nullable Throwable throwable) { + if (throwable != null) { + log.info( + "Failed to go from {} to {} because the ExecutionGraph creation failed.", + CreatingExecutionGraph.class.getSimpleName(), + Executing.class.getSimpleName(), + throwable); + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable)); + } else { + final AssignmentResult result = context.tryToAssignSlots(executionGraph); + + if (result.isSuccess()) { + context.goToExecuting(result.getExecutionGraph()); + } else { + context.goToWaitingForResources(); + } + } + } + + @Override + public void cancel() { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); + } + + @Override + public void suspend(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); + } + + @Override + public JobStatus getJobStatus() { + return JobStatus.INITIALIZING; + } + + @Override + public ArchivedExecutionGraph getJob() { + return context.getArchivedExecutionGraph(getJobStatus(), null); + } + + @Override + public void handleGlobalFailure(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); + } + + @Override + public Logger getLogger() { + return log; + } + + /** Context for the {@link CreatingExecutionGraph} state. */ + interface Context { + + /** + * Transitions into the {@link Finished} state. + * + * @param archivedExecutionGraph archivedExecutionGraph representing the final job state + */ + void goToFinished(ArchivedExecutionGraph archivedExecutionGraph); + + /** + * Transitions into the {@link Executing} state. + * + * @param executionGraph executionGraph which is passed to the {@link Executing} state + */ + void goToExecuting(ExecutionGraph executionGraph); + + /** Transitions into the {@link WaitingForResources} state. */ + void goToWaitingForResources(); + + /** + * Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can + * be null if there is no failure. + * + * @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with + * @param cause cause describing a failure cause; {@code null} if there is none + * @return the created {@link ArchivedExecutionGraph} + */ + ArchivedExecutionGraph getArchivedExecutionGraph( + JobStatus jobStatus, @Nullable Throwable cause); + + /** + * Runs the given action after a delay if the state at this time equals the expected state. + * + * @param expectedState expectedState describes the required state at the time of running + * the action + * @param action action to run if the expected state equals the actual state + * @param delay delay after which to run the action + */ + void runIfState(State expectedState, Runnable action, Duration delay); + + /** + * Try to assign slots to the created {@link ExecutionGraph}. If it is possible, then this + * method returns a successful {@link AssignmentResult} which contains the assigned {@link + * ExecutionGraph}. If not, then the assignment result is a failure. + * + * @param executionGraph executionGraph to assign slots to + * @return {@link AssignmentResult} representing the result of the assignment + */ + AssignmentResult tryToAssignSlots(ExecutionGraph executionGraph); + } + + /** + * Class representing the assignment result of the slots to the {@link ExecutionGraph}. The + * assignment is either successful or not possible. If it is successful, the assignment also + * contains the assigned {@link ExecutionGraph}. + */ + static final class AssignmentResult { + + private static final AssignmentResult NOT_POSSIBLE = new AssignmentResult(null); + + @Nullable private final ExecutionGraph executionGraph; + + private AssignmentResult(@Nullable ExecutionGraph executionGraph) { + this.executionGraph = executionGraph; + } + + boolean isSuccess() { + return executionGraph != null; + } + + ExecutionGraph getExecutionGraph() { + Preconditions.checkState( + isSuccess(), "Can only return the ExecutionGraph if it is a success."); + return executionGraph; + } + + static AssignmentResult success(ExecutionGraph executionGraph) { + return new AssignmentResult(executionGraph); Review comment: I will add it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
