This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc0f2c698b68960d227d5c7dc8a06a52b7138888 Author: Till Rohrmann <[email protected]> AuthorDate: Sun Mar 14 00:12:38 2021 +0100 [FLINK-21602] Add CreatingExecutionGraph state The CreatingExecutionGraph state models the asynchronous ExecutionGraph creation after enough resources have been acquired in the WaitingForResources state. --- .../scheduler/adaptive/AdaptiveScheduler.java | 231 +++++++++++---------- .../scheduler/adaptive/CreatingExecutionGraph.java | 231 +++++++++++++++++++++ .../scheduler/adaptive/WaitingForResources.java | 31 +-- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 2 +- .../adaptive/CreatingExecutionGraphTest.java | 215 +++++++++++++++++++ .../adaptive/WaitingForResourcesTest.java | 62 +----- 6 files changed, 585 insertions(+), 187 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 0998faf..1c5763e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -149,6 +149,7 @@ public class AdaptiveScheduler implements SchedulerNG, Created.Context, WaitingForResources.Context, + CreatingExecutionGraph.Context, Executing.Context, Restarting.Context, Failing.Context, @@ -599,112 +600,6 @@ public class AdaptiveScheduler } @Override - public ExecutionGraph createExecutionGraphWithAvailableResources() throws Exception { - final ParallelismAndResourceAssignments parallelismAndResourceAssignments = - determineParallelismAndAssignResources(slotAllocator); - - JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); - for (JobVertex vertex : adjustedJobGraph.getVertices()) { - vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID())); - } - - final ExecutionGraph executionGraph = createExecutionGraphAndRestoreState(adjustedJobGraph); - - executionGraph.start(componentMainThreadExecutor); - executionGraph.transitionToRunning(); - - executionGraph.setInternalTaskFailuresListener( - new UpdateSchedulerNgOnInternalFailuresListener(this)); - - for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { - final LogicalSlot assignedSlot = - parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID()); - executionVertex - .getCurrentExecutionAttempt() - .registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false); - executionVertex.tryAssignResource(assignedSlot); - } - return executionGraph; - } - - private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph) - throws Exception { - ExecutionDeploymentListener executionDeploymentListener = - new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker); - ExecutionStateUpdateListener executionStateUpdateListener = - (execution, newState) -> { - if (newState.isTerminal()) { - executionDeploymentTracker.stopTrackingDeploymentOf(execution); - } - }; - - final ExecutionGraph newExecutionGraph = - DefaultExecutionGraphBuilder.buildGraph( - adjustedJobGraph, - configuration, - futureExecutor, - ioExecutor, - userCodeClassLoader, - completedCheckpointStore, - checkpointsCleaner, - checkpointIdCounter, - rpcTimeout, - jobManagerJobMetricGroup, - blobWriter, - LOG, - shuffleMaster, - partitionTracker, - TaskDeploymentDescriptorFactory.PartitionLocationConstraint - .MUST_BE_KNOWN, // AdaptiveScheduler only supports streaming jobs - executionDeploymentListener, - executionStateUpdateListener, - initializationTimestamp, - vertexAttemptNumberStore); - - final CheckpointCoordinator checkpointCoordinator = - newExecutionGraph.getCheckpointCoordinator(); - - if (checkpointCoordinator != null) { - // check whether we find a valid checkpoint - if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( - new HashSet<>(newExecutionGraph.getAllVertices().values()))) { - - // check whether we can restore from a savepoint - tryRestoreExecutionGraphFromSavepoint( - newExecutionGraph, adjustedJobGraph.getSavepointRestoreSettings()); - } - } - - return newExecutionGraph; - } - - /** - * Tries to restore the given {@link ExecutionGraph} from the provided {@link - * SavepointRestoreSettings}, iff checkpointing is enabled. - * - * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored - * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about - * the savepoint to restore from - * @throws Exception if the {@link ExecutionGraph} could not be restored - */ - private void tryRestoreExecutionGraphFromSavepoint( - ExecutionGraph executionGraphToRestore, - SavepointRestoreSettings savepointRestoreSettings) - throws Exception { - if (savepointRestoreSettings.restoreSavepoint()) { - final CheckpointCoordinator checkpointCoordinator = - executionGraphToRestore.getCheckpointCoordinator(); - if (checkpointCoordinator != null) { - checkpointCoordinator.restoreSavepoint( - savepointRestoreSettings.getRestorePath(), - savepointRestoreSettings.allowNonRestoredState(), - executionGraphToRestore.getAllVertices(), - userCodeClassLoader); - } - } - } - - @Override public ArchivedExecutionGraph getArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { return ArchivedExecutionGraph.createFromInitializingJob( @@ -813,6 +708,125 @@ public class AdaptiveScheduler } @Override + public void goToCreatingExecutionGraph() { + CompletableFuture<ExecutionGraph> executionGraphFuture; + + try { + final ExecutionGraph executionGraph = createExecutionGraphWithAvailableResources(); + executionGraphFuture = CompletableFuture.completedFuture(executionGraph); + } catch (Exception exception) { + executionGraphFuture = FutureUtils.completedExceptionally(exception); + } + + transitionToState(new CreatingExecutionGraph.Factory(this, executionGraphFuture, LOG)); + } + + ExecutionGraph createExecutionGraphWithAvailableResources() throws Exception { + final ParallelismAndResourceAssignments parallelismAndResourceAssignments = + determineParallelismAndAssignResources(slotAllocator); + + JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); + for (JobVertex vertex : adjustedJobGraph.getVertices()) { + vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID())); + } + + final ExecutionGraph executionGraph = createExecutionGraphAndRestoreState(adjustedJobGraph); + + executionGraph.start(componentMainThreadExecutor); + executionGraph.transitionToRunning(); + + executionGraph.setInternalTaskFailuresListener( + new UpdateSchedulerNgOnInternalFailuresListener(this)); + + for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { + final LogicalSlot assignedSlot = + parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID()); + executionVertex + .getCurrentExecutionAttempt() + .registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false); + executionVertex.tryAssignResource(assignedSlot); + } + return executionGraph; + } + + private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph) + throws Exception { + ExecutionDeploymentListener executionDeploymentListener = + new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker); + ExecutionStateUpdateListener executionStateUpdateListener = + (execution, newState) -> { + if (newState.isTerminal()) { + executionDeploymentTracker.stopTrackingDeploymentOf(execution); + } + }; + + final ExecutionGraph newExecutionGraph = + DefaultExecutionGraphBuilder.buildGraph( + adjustedJobGraph, + configuration, + futureExecutor, + ioExecutor, + userCodeClassLoader, + completedCheckpointStore, + checkpointsCleaner, + checkpointIdCounter, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + LOG, + shuffleMaster, + partitionTracker, + TaskDeploymentDescriptorFactory.PartitionLocationConstraint + .MUST_BE_KNOWN, // AdaptiveScheduler only supports streaming jobs + executionDeploymentListener, + executionStateUpdateListener, + initializationTimestamp, + vertexAttemptNumberStore); + + final CheckpointCoordinator checkpointCoordinator = + newExecutionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator != null) { + // check whether we find a valid checkpoint + if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( + new HashSet<>(newExecutionGraph.getAllVertices().values()))) { + + // check whether we can restore from a savepoint + tryRestoreExecutionGraphFromSavepoint( + newExecutionGraph, adjustedJobGraph.getSavepointRestoreSettings()); + } + } + + return newExecutionGraph; + } + + /** + * Tries to restore the given {@link ExecutionGraph} from the provided {@link + * SavepointRestoreSettings}, iff checkpointing is enabled. + * + * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored + * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about + * the savepoint to restore from + * @throws Exception if the {@link ExecutionGraph} could not be restored + */ + private void tryRestoreExecutionGraphFromSavepoint( + ExecutionGraph executionGraphToRestore, + SavepointRestoreSettings savepointRestoreSettings) + throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = + executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + checkpointCoordinator.restoreSavepoint( + savepointRestoreSettings.getRestorePath(), + savepointRestoreSettings.allowNonRestoredState(), + executionGraphToRestore.getAllVertices(), + userCodeClassLoader); + } + } + } + + @Override public boolean canScaleUp(ExecutionGraph executionGraph) { int availableSlots = declarativeSlotPool.getFreeSlotsInformation().size(); @@ -936,6 +950,11 @@ public class AdaptiveScheduler () -> runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS); } + @Override + public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(ExecutionGraph executionGraph) { + return CreatingExecutionGraph.AssignmentResult.success(executionGraph); + } + // ---------------------------------------------------------------- /** Note: Do not call this method from a State constructor or State#onLeave. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java new file mode 100644 index 0000000..b6f8df4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +/** + * State which waits for the creation of the {@link ExecutionGraph}. If the creation fails, then the + * state transitions to {@link Finished}. If the creation succeeds, then the system tries to assign + * the required slots. If the set of available slots has changed so that the created {@link + * ExecutionGraph} cannot be executed, the state transitions back into {@link WaitingForResources}. + * If there are enough slots for the {@link ExecutionGraph} to run, the state transitions to {@link + * Executing}. + */ +public class CreatingExecutionGraph implements State { + + private final Context context; + + private final Logger log; + + public CreatingExecutionGraph( + Context context, CompletableFuture<ExecutionGraph> executionGraphFuture, Logger log) { + this.context = context; + this.log = log; + + FutureUtils.assertNoException( + executionGraphFuture.handle( + (executionGraph, throwable) -> { + context.runIfState( + this, + () -> handleExecutionGraphCreation(executionGraph, throwable), + Duration.ZERO); + return null; + })); + } + + private void handleExecutionGraphCreation( + @Nullable ExecutionGraph executionGraph, @Nullable Throwable throwable) { + if (throwable != null) { + log.info( + "Failed to go from {} to {} because the ExecutionGraph creation failed.", + CreatingExecutionGraph.class.getSimpleName(), + Executing.class.getSimpleName(), + throwable); + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, throwable)); + } else { + final AssignmentResult result = context.tryToAssignSlots(executionGraph); + + if (result.isSuccess()) { + context.goToExecuting(result.getExecutionGraph()); + } else { + context.goToWaitingForResources(); + } + } + } + + @Override + public void cancel() { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); + } + + @Override + public void suspend(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); + } + + @Override + public JobStatus getJobStatus() { + return JobStatus.INITIALIZING; + } + + @Override + public ArchivedExecutionGraph getJob() { + return context.getArchivedExecutionGraph(getJobStatus(), null); + } + + @Override + public void handleGlobalFailure(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); + } + + @Override + public Logger getLogger() { + return log; + } + + /** Context for the {@link CreatingExecutionGraph} state. */ + interface Context { + + /** + * Transitions into the {@link Finished} state. + * + * @param archivedExecutionGraph archivedExecutionGraph representing the final job state + */ + void goToFinished(ArchivedExecutionGraph archivedExecutionGraph); + + /** + * Transitions into the {@link Executing} state. + * + * @param executionGraph executionGraph which is passed to the {@link Executing} state + */ + void goToExecuting(ExecutionGraph executionGraph); + + /** Transitions into the {@link WaitingForResources} state. */ + void goToWaitingForResources(); + + /** + * Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can + * be null if there is no failure. + * + * @param jobStatus jobStatus to initialize the {@link ArchivedExecutionGraph} with + * @param cause cause describing a failure cause; {@code null} if there is none + * @return the created {@link ArchivedExecutionGraph} + */ + ArchivedExecutionGraph getArchivedExecutionGraph( + JobStatus jobStatus, @Nullable Throwable cause); + + /** + * Runs the given action after a delay if the state at this time equals the expected state. + * + * @param expectedState expectedState describes the required state at the time of running + * the action + * @param action action to run if the expected state equals the actual state + * @param delay delay after which to run the action + */ + void runIfState(State expectedState, Runnable action, Duration delay); + + /** + * Try to assign slots to the created {@link ExecutionGraph}. If it is possible, then this + * method returns a successful {@link AssignmentResult} which contains the assigned {@link + * ExecutionGraph}. If not, then the assignment result is a failure. + * + * @param executionGraph executionGraph to assign slots to + * @return {@link AssignmentResult} representing the result of the assignment + */ + AssignmentResult tryToAssignSlots(ExecutionGraph executionGraph); + } + + /** + * Class representing the assignment result of the slots to the {@link ExecutionGraph}. The + * assignment is either successful or not possible. If it is successful, the assignment also + * contains the assigned {@link ExecutionGraph}. + */ + static final class AssignmentResult { + + private static final AssignmentResult NOT_POSSIBLE = new AssignmentResult(null); + + @Nullable private final ExecutionGraph executionGraph; + + private AssignmentResult(@Nullable ExecutionGraph executionGraph) { + this.executionGraph = executionGraph; + } + + boolean isSuccess() { + return executionGraph != null; + } + + ExecutionGraph getExecutionGraph() { + Preconditions.checkState( + isSuccess(), "Can only return the ExecutionGraph if it is a success."); + return executionGraph; + } + + static AssignmentResult success(ExecutionGraph executionGraph) { + return new AssignmentResult( + Preconditions.checkNotNull( + executionGraph, + "AssignmentResult.success expects a non-null ExecutionGraph.")); + } + + static AssignmentResult notPossible() { + return NOT_POSSIBLE; + } + } + + /** Factory for the {@link CreatingExecutionGraph} state. */ + static class Factory implements StateFactory<CreatingExecutionGraph> { + + private final Context context; + + private final CompletableFuture<ExecutionGraph> executionGraphFuture; + + private final Logger log; + + Factory( + Context context, + CompletableFuture<ExecutionGraph> executionGraphFuture, + Logger log) { + this.context = context; + this.executionGraphFuture = executionGraphFuture; + this.log = log; + } + + @Override + public Class<CreatingExecutionGraph> getStateClass() { + return CreatingExecutionGraph.class; + } + + @Override + public CreatingExecutionGraph getState() { + return new CreatingExecutionGraph(context, executionGraphFuture, log); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java index 6f118c0..848d729 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.Preconditions; @@ -101,19 +100,7 @@ class WaitingForResources implements State, ResourceConsumer { } private void createExecutionGraphWithAvailableResources() { - try { - final ExecutionGraph executionGraph = - context.createExecutionGraphWithAvailableResources(); - - context.goToExecuting(executionGraph); - } catch (Exception exception) { - log.info( - "Failed to go from {} to {} because the ExecutionGraph creation failed.", - WaitingForResources.class.getSimpleName(), - Executing.class.getSimpleName(), - exception); - context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, exception)); - } + context.goToCreatingExecutionGraph(); } /** Context of the {@link WaitingForResources} state. */ @@ -126,12 +113,8 @@ class WaitingForResources implements State, ResourceConsumer { */ void goToFinished(ArchivedExecutionGraph archivedExecutionGraph); - /** - * Transitions into the {@link Executing} state. - * - * @param executionGraph executionGraph which is passed to the {@link Executing} state - */ - void goToExecuting(ExecutionGraph executionGraph); + /** Transitions into the {@link CreatingExecutionGraph} state. */ + void goToCreatingExecutionGraph(); /** * Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can @@ -153,14 +136,6 @@ class WaitingForResources implements State, ResourceConsumer { boolean hasEnoughResources(ResourceCounter desiredResources); /** - * Creates an {@link ExecutionGraph} with the available resources. - * - * @return the created {@link ExecutionGraph} - * @throws Exception if the creation of the {@link ExecutionGraph} fails - */ - ExecutionGraph createExecutionGraphWithAvailableResources() throws Exception; - - /** * Runs the given action after a delay if the state at this time equals the expected state. * * @param expectedState expectedState describes the required state at the time of running diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index ec4d4b2..4958a37 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -479,7 +479,7 @@ public class AdaptiveSchedulerTest extends TestLogger { createSlotOffersForResourceRequirements( ResourceCounter.withResource(ResourceProfile.UNKNOWN, PARALLELISM))); - assertThat(scheduler.getState(), instanceOf(Executing.class)); + assertThat(scheduler.getState(), instanceOf(CreatingExecutionGraph.class)); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java new file mode 100644 index 0000000..3d467fb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link CreatingExecutionGraph} state. */ +public class CreatingExecutionGraphTest extends TestLogger { + + @Test + public void testCancelTransitionsToFinished() throws Exception { + try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph(context, new CompletableFuture<>(), log); + + context.setExpectFinished( + archivedExecutionGraph -> + assertThat(archivedExecutionGraph.getState(), is(JobStatus.CANCELED))); + + creatingExecutionGraph.cancel(); + } + } + + @Test + public void testSuspendTransitionsToFinished() throws Exception { + try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph(context, new CompletableFuture<>(), log); + + context.setExpectFinished( + archivedExecutionGraph -> + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED))); + + creatingExecutionGraph.suspend(new FlinkException("Job has been suspended.")); + } + } + + @Test + public void testGlobalFailureTransitionsToFinished() throws Exception { + try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph(context, new CompletableFuture<>(), log); + + context.setExpectFinished( + archivedExecutionGraph -> + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED))); + + creatingExecutionGraph.handleGlobalFailure(new FlinkException("Test exception")); + } + } + + @Test + public void testFailedExecutionGraphCreationTransitionsToFinished() throws Exception { + try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { + final CompletableFuture<ExecutionGraph> executionGraphFuture = + new CompletableFuture<>(); + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph(context, executionGraphFuture, log); + + context.setExpectFinished( + archivedExecutionGraph -> + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED))); + + executionGraphFuture.completeExceptionally(new FlinkException("Test exception")); + } + } + + @Test + public void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() throws Exception { + try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { + final CompletableFuture<ExecutionGraph> executionGraphFuture = + new CompletableFuture<>(); + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph(context, executionGraphFuture, log); + + context.setTryToAssignSlotsFunction( + ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); + context.setExpectWaitingForResources(); + + executionGraphFuture.complete(new StateTrackingMockExecutionGraph()); + } + } + + @Test + public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exception { + try (MockCreatingExecutionGraphContext context = new MockCreatingExecutionGraphContext()) { + final CompletableFuture<ExecutionGraph> executionGraphFuture = + new CompletableFuture<>(); + final CreatingExecutionGraph creatingExecutionGraph = + new CreatingExecutionGraph(context, executionGraphFuture, log); + + final StateTrackingMockExecutionGraph executionGraph = + new StateTrackingMockExecutionGraph(); + + context.setTryToAssignSlotsFunction(CreatingExecutionGraph.AssignmentResult::success); + context.setExpectedExecuting( + actualExecutionGraph -> + assertThat(actualExecutionGraph, sameInstance(executionGraph))); + + executionGraphFuture.complete(executionGraph); + } + } + + static class MockCreatingExecutionGraphContext + implements CreatingExecutionGraph.Context, AutoCloseable { + private final StateValidator<ArchivedExecutionGraph> finishedStateValidator = + new StateValidator<>("Finished"); + private final StateValidator<Void> waitingForResourcesStateValidator = + new StateValidator<>("WaitingForResources"); + private final StateValidator<ExecutionGraph> executingStateValidator = + new StateValidator<>("Executing"); + + private Function<ExecutionGraph, CreatingExecutionGraph.AssignmentResult> + tryToAssignSlotsFunction = CreatingExecutionGraph.AssignmentResult::success; + + private boolean hadStateTransitionHappened = false; + + public void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) { + finishedStateValidator.expectInput(asserter); + } + + public void setExpectWaitingForResources() { + waitingForResourcesStateValidator.expectInput((none) -> {}); + } + + public void setExpectedExecuting(Consumer<ExecutionGraph> asserter) { + executingStateValidator.expectInput(asserter); + } + + public void setTryToAssignSlotsFunction( + Function<ExecutionGraph, CreatingExecutionGraph.AssignmentResult> + tryToAssignSlotsFunction) { + this.tryToAssignSlotsFunction = tryToAssignSlotsFunction; + } + + @Override + public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { + finishedStateValidator.validateInput(archivedExecutionGraph); + hadStateTransitionHappened = true; + } + + @Override + public void goToExecuting(ExecutionGraph executionGraph) { + executingStateValidator.validateInput(executionGraph); + hadStateTransitionHappened = true; + } + + @Override + public ArchivedExecutionGraph getArchivedExecutionGraph( + JobStatus jobStatus, @Nullable Throwable cause) { + return ArchivedExecutionGraph.createFromInitializingJob( + new JobID(), "testJob", jobStatus, cause, 0L); + } + + @Override + public void runIfState(State expectedState, Runnable action, Duration delay) { + if (!hadStateTransitionHappened) { + action.run(); + } + } + + @Override + public CreatingExecutionGraph.AssignmentResult tryToAssignSlots( + ExecutionGraph executionGraph) { + return tryToAssignSlotsFunction.apply(executionGraph); + } + + @Override + public void goToWaitingForResources() { + waitingForResourcesStateValidator.validateInput(null); + hadStateTransitionHappened = true; + } + + @Override + public void close() throws Exception { + finishedStateValidator.close(); + waitingForResourcesStateValidator.close(); + executingStateValidator.close(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java index 05135bb..8586551 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java @@ -22,13 +22,9 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; -import org.apache.flink.util.function.SupplierWithException; import org.junit.Test; @@ -53,11 +49,11 @@ public class WaitingForResourcesTest extends TestLogger { /** WaitingForResources is transitioning to Executing if there are enough resources. */ @Test - public void testTransitionToExecuting() throws Exception { + public void testTransitionToCreatingExecutionGraph() throws Exception { try (MockContext ctx = new MockContext()) { ctx.setHasEnoughResources(() -> true); - ctx.setExpectExecuting(assertNonNull()); + ctx.setExpectCreatingExecutionGraph(); new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO); // run delayed actions @@ -71,31 +67,6 @@ public class WaitingForResourcesTest extends TestLogger { } @Test - public void testTransitionToFinishedOnExecutionGraphInitializationFailure() throws Exception { - try (MockContext ctx = new MockContext()) { - ctx.setHasEnoughResources(() -> true); - ctx.setCreateExecutionGraphWithAvailableResources( - () -> { - throw new RuntimeException("Test exception"); - }); - - ctx.setExpectFinished( - (archivedExecutionGraph -> { - assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); - })); - - new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO); - - // run delayed actions - for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) { - if (!ctx.hasStateTransition()) { - scheduledRunnable.runAction(); - } - } - } - } - - @Test public void testNotEnoughResources() throws Exception { try (MockContext ctx = new MockContext()) { ctx.setHasEnoughResources(() -> false); @@ -114,7 +85,7 @@ public class WaitingForResourcesTest extends TestLogger { WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO); ctx.setHasEnoughResources(() -> true); // make resources available - ctx.setExpectExecuting(assertNonNull()); + ctx.setExpectCreatingExecutionGraph(); wfr.notifyNewResourcesAvailable(); // .. and notify } } @@ -126,7 +97,7 @@ public class WaitingForResourcesTest extends TestLogger { WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO); - ctx.setExpectExecuting(assertNonNull()); + ctx.setExpectCreatingExecutionGraph(); // immediately execute all scheduled runnables assertThat(ctx.getScheduledRunnables().size(), greaterThan(0)); @@ -195,15 +166,12 @@ public class WaitingForResourcesTest extends TestLogger { private static class MockContext implements WaitingForResources.Context, AutoCloseable { - private final StateValidator<ExecutionGraph> executingStateValidator = + private final StateValidator<Void> creatingExecutionGraphStateValidator = new StateValidator<>("executing"); private final StateValidator<ArchivedExecutionGraph> finishedStateValidator = new StateValidator<>("finished"); private Supplier<Boolean> hasEnoughResourcesSupplier = () -> false; - private SupplierWithException<ExecutionGraph, FlinkException> - createExecutionGraphWithAvailableResources = - () -> TestingDefaultExecutionGraphBuilder.newBuilder().build(); private final List<ScheduledRunnable> scheduledRunnables = new ArrayList<>(); private boolean hasStateTransition = false; @@ -215,22 +183,17 @@ public class WaitingForResourcesTest extends TestLogger { hasEnoughResourcesSupplier = sup; } - public void setCreateExecutionGraphWithAvailableResources( - SupplierWithException<ExecutionGraph, FlinkException> sup) { - this.createExecutionGraphWithAvailableResources = sup; - } - void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) { finishedStateValidator.expectInput(asserter); } - void setExpectExecuting(Consumer<ExecutionGraph> asserter) { - executingStateValidator.expectInput(asserter); + void setExpectCreatingExecutionGraph() { + creatingExecutionGraphStateValidator.expectInput(none -> {}); } @Override public void close() throws Exception { - executingStateValidator.close(); + creatingExecutionGraphStateValidator.close(); finishedStateValidator.close(); } @@ -249,11 +212,6 @@ public class WaitingForResourcesTest extends TestLogger { } @Override - public ExecutionGraph createExecutionGraphWithAvailableResources() throws FlinkException { - return createExecutionGraphWithAvailableResources.get(); - } - - @Override public void runIfState(State expectedState, Runnable action, Duration delay) { scheduledRunnables.add(new ScheduledRunnable(expectedState, action, delay)); } @@ -265,8 +223,8 @@ public class WaitingForResourcesTest extends TestLogger { } @Override - public void goToExecuting(ExecutionGraph executionGraph) { - executingStateValidator.validateInput(executionGraph); + public void goToCreatingExecutionGraph() { + creatingExecutionGraphStateValidator.validateInput(null); hasStateTransition = true; }
