This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 676e8e5528f99eb8ba5747f7489b0f02ee025dd6 Author: Roman Khachatryan <[email protected]> AuthorDate: Mon Feb 20 23:08:47 2023 +0000 [FLINK-21450][runtime] Add previous ExecutionGraph to WaitingForResources AdaptiveScheduler state Previous ExecutionGraph will be used in a subsequent commit to allocate workloads more optimally by taking previous allocations into account. --- .../scheduler/adaptive/AdaptiveScheduler.java | 12 ++++++---- .../flink/runtime/scheduler/adaptive/Created.java | 2 +- .../scheduler/adaptive/CreatingExecutionGraph.java | 17 +++++++++---- .../runtime/scheduler/adaptive/Restarting.java | 5 +++- .../scheduler/adaptive/StateTransitions.java | 6 +++-- .../scheduler/adaptive/WaitingForResources.java | 28 +++++++++++++++++----- .../runtime/scheduler/adaptive/CreatedTest.java | 3 ++- .../adaptive/CreatingExecutionGraphTest.java | 23 +++++++++++------- .../runtime/scheduler/adaptive/RestartingTest.java | 2 +- .../adaptive/WaitingForResourcesTest.java | 9 ++++--- 10 files changed, 76 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 4f7b559e86a..0bdbb44da7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -784,7 +784,7 @@ public class AdaptiveScheduler } @Override - public void goToWaitingForResources() { + public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { final ResourceCounter desiredResources = calculateDesiredResources(); declarativeSlotPool.setResourceRequirements(desiredResources); @@ -794,7 +794,8 @@ public class AdaptiveScheduler LOG, desiredResources, this.initialResourceAllocationTimeout, - this.resourceStabilizationTimeout)); + this.resourceStabilizationTimeout, + previousExecutionGraph)); } private ResourceCounter calculateDesiredResources() { @@ -916,14 +917,17 @@ public class AdaptiveScheduler } @Override - public void goToCreatingExecutionGraph() { + public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> executionGraphWithAvailableResourcesFuture = createExecutionGraphWithAvailableResourcesAsync(); transitionToState( new CreatingExecutionGraph.Factory( - this, executionGraphWithAvailableResourcesFuture, LOG)); + this, + executionGraphWithAvailableResourcesFuture, + LOG, + previousExecutionGraph)); } private CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java index 1dd9f1e8f16..798eeda352e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java @@ -69,7 +69,7 @@ class Created implements State { /** Starts the scheduling by going into the {@link WaitingForResources} state. */ void startScheduling() { - context.goToWaitingForResources(); + context.goToWaitingForResources(null); } /** Context of the {@link Created} state. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index 0ec7437bb85..c87e58b2971 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -61,12 +61,15 @@ public class CreatingExecutionGraph implements State { private final Logger logger; private final OperatorCoordinatorHandlerFactory operatorCoordinatorHandlerFactory; + private final @Nullable ExecutionGraph previousExecutionGraph; + public CreatingExecutionGraph( Context context, CompletableFuture<ExecutionGraphWithVertexParallelism> executionGraphWithParallelismFuture, Logger logger, - OperatorCoordinatorHandlerFactory operatorCoordinatorFactory) { + OperatorCoordinatorHandlerFactory operatorCoordinatorFactory, + ExecutionGraph previousExecutionGraph1) { this.context = context; this.logger = logger; this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory; @@ -82,6 +85,7 @@ public class CreatingExecutionGraph implements State { Duration.ZERO); return null; })); + previousExecutionGraph = previousExecutionGraph1; } private void handleExecutionGraphCreation( @@ -141,7 +145,7 @@ public class CreatingExecutionGraph implements State { } else { logger.debug( "Failed to reserve and assign the required slots. Waiting for new resources."); - context.goToWaitingForResources(); + context.goToWaitingForResources(previousExecutionGraph); } } } @@ -300,16 +304,20 @@ public class CreatingExecutionGraph implements State { private final CompletableFuture<ExecutionGraphWithVertexParallelism> executionGraphWithParallelismFuture; + private final @Nullable ExecutionGraph previousExecutionGraph; + private final Logger log; Factory( Context context, CompletableFuture<ExecutionGraphWithVertexParallelism> executionGraphWithParallelismFuture, - Logger log) { + Logger log, + @Nullable ExecutionGraph previousExecutionGraph) { this.context = context; this.executionGraphWithParallelismFuture = executionGraphWithParallelismFuture; this.log = log; + this.previousExecutionGraph = previousExecutionGraph; } @Override @@ -323,7 +331,8 @@ public class CreatingExecutionGraph implements State { context, executionGraphWithParallelismFuture, log, - DefaultOperatorCoordinatorHandler::new); + DefaultOperatorCoordinatorHandler::new, + previousExecutionGraph); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index 125804959dc..f0a38035f44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -97,7 +97,10 @@ class Restarting extends StateWithExecutionGraph { void onGloballyTerminalState(JobStatus globallyTerminalState) { Preconditions.checkArgument(globallyTerminalState == JobStatus.CANCELED); goToWaitingForResourcesFuture = - context.runIfState(this, context::goToWaitingForResources, backoffTime); + context.runIfState( + this, + () -> context.goToWaitingForResources(getExecutionGraph()), + backoffTime); } /** Context of the {@link Restarting} state. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java index bd3f8a509c4..f6881be4dde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -58,7 +60,7 @@ public interface StateTransitions { interface ToCreatingExecutionGraph extends StateTransitions { /** Transitions into the {@link CreatingExecutionGraph} state. */ - void goToCreatingExecutionGraph(); + void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph); } /** Interface covering transition to the {@link Executing} state. */ @@ -164,6 +166,6 @@ public interface StateTransitions { interface ToWaitingForResources extends StateTransitions { /** Transitions into the {@link WaitingForResources} state. */ - void goToWaitingForResources(); + void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java index 7081bb8980f..f102bcfc4ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.Preconditions; import org.apache.flink.util.clock.Clock; @@ -53,6 +54,9 @@ class WaitingForResources implements State, ResourceConsumer { @Nullable private ScheduledFuture<?> resourceTimeoutFuture; + @Nullable private final ExecutionGraph previousExecutionGraph; + + @VisibleForTesting WaitingForResources( Context context, Logger log, @@ -65,17 +69,18 @@ class WaitingForResources implements State, ResourceConsumer { desiredResources, initialResourceAllocationTimeout, resourceStabilizationTimeout, - SystemClock.getInstance()); + SystemClock.getInstance(), + null); } - @VisibleForTesting WaitingForResources( Context context, Logger log, ResourceCounter desiredResources, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, - Clock clock) { + Clock clock, + @Nullable ExecutionGraph previousExecutionGraph) { this.context = Preconditions.checkNotNull(context); this.log = Preconditions.checkNotNull(log); this.desiredResources = Preconditions.checkNotNull(desiredResources); @@ -97,6 +102,7 @@ class WaitingForResources implements State, ResourceConsumer { context.runIfState( this, this::resourceTimeout, initialResourceAllocationTimeout); } + this.previousExecutionGraph = previousExecutionGraph; context.runIfState(this, this::notifyNewResourcesAvailable, Duration.ZERO); } @@ -175,7 +181,7 @@ class WaitingForResources implements State, ResourceConsumer { } private void createExecutionGraphWithAvailableResources() { - context.goToCreatingExecutionGraph(); + context.goToCreatingExecutionGraph(previousExecutionGraph); } /** Context of the {@link WaitingForResources} state. */ @@ -227,18 +233,21 @@ class WaitingForResources implements State, ResourceConsumer { private final ResourceCounter desiredResources; private final Duration initialResourceAllocationTimeout; private final Duration resourceStabilizationTimeout; + @Nullable private final ExecutionGraph previousExecutionGraph; public Factory( Context context, Logger log, ResourceCounter desiredResources, Duration initialResourceAllocationTimeout, - Duration resourceStabilizationTimeout) { + Duration resourceStabilizationTimeout, + ExecutionGraph previousExecutionGraph) { this.context = context; this.log = log; this.desiredResources = desiredResources; this.initialResourceAllocationTimeout = initialResourceAllocationTimeout; this.resourceStabilizationTimeout = resourceStabilizationTimeout; + this.previousExecutionGraph = previousExecutionGraph; } public Class<WaitingForResources> getStateClass() { @@ -251,7 +260,14 @@ class WaitingForResources implements State, ResourceConsumer { log, desiredResources, initialResourceAllocationTimeout, - resourceStabilizationTimeout); + resourceStabilizationTimeout, + SystemClock.getInstance(), + previousExecutionGraph); } } + + @Nullable + public ExecutionGraph getPreviousExecutionGraph() { + return previousExecutionGraph; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java index 6887308f01b..e63513f16df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -122,7 +123,7 @@ public class CreatedTest extends TestLogger { } @Override - public void goToWaitingForResources() { + public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { waitingForResourcesStateValidator.validateInput(null); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index aa92d1dc089..85a4fe4f3ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -67,7 +67,8 @@ public class CreatingExecutionGraphTest extends TestLogger { context, new CompletableFuture<>(), log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler); + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); context.setExpectFinished( archivedExecutionGraph -> @@ -86,7 +87,8 @@ public class CreatingExecutionGraphTest extends TestLogger { context, new CompletableFuture<>(), log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler); + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); context.setExpectFinished( archivedExecutionGraph -> @@ -105,7 +107,8 @@ public class CreatingExecutionGraphTest extends TestLogger { context, new CompletableFuture<>(), log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler); + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); context.setExpectFinished( archivedExecutionGraph -> @@ -125,7 +128,8 @@ public class CreatingExecutionGraphTest extends TestLogger { context, executionGraphWithVertexParallelismFuture, log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler); + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); context.setExpectFinished( archivedExecutionGraph -> @@ -146,7 +150,8 @@ public class CreatingExecutionGraphTest extends TestLogger { context, executionGraphWithVertexParallelismFuture, log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler); + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); context.setTryToAssignSlotsFunction( ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); @@ -167,7 +172,8 @@ public class CreatingExecutionGraphTest extends TestLogger { context, executionGraphWithVertexParallelismFuture, log, - CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler); + CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler, + null); final StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); @@ -197,7 +203,8 @@ public class CreatingExecutionGraphTest extends TestLogger { (executionGraph, errorHandler) -> { operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler); return new TestingOperatorCoordinatorHandler(); - }); + }, + null); final StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); @@ -318,7 +325,7 @@ public class CreatingExecutionGraphTest extends TestLogger { } @Override - public void goToWaitingForResources() { + public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { waitingForResourcesStateValidator.validateInput(null); hadStateTransitionHappened = true; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index 94e7b75992d..264cb221f70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -174,7 +174,7 @@ public class RestartingTest extends TestLogger { public void archiveFailure(RootExceptionHistoryEntry failure) {} @Override - public void goToWaitingForResources() { + public void goToWaitingForResources(ExecutionGraph previousExecutionGraph) { waitingForResourcesStateValidator.validateInput(null); hadStateTransition = true; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java index 504a29e65ec..2c58f053b13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java @@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.ScheduledTask; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.TestLogger; @@ -157,7 +158,8 @@ public class WaitingForResourcesTest extends TestLogger { RESOURCE_COUNTER, initialResourceTimeout, stabilizationTimeout, - ctx.getClock()); + ctx.getClock(), + null); // sufficient resources available ctx.setHasDesiredResources(() -> false); ctx.setHasSufficientResources(() -> true); @@ -191,7 +193,8 @@ public class WaitingForResourcesTest extends TestLogger { RESOURCE_COUNTER, initialResourceTimeout, stabilizationTimeout, - ctx.getClock()); + ctx.getClock(), + null); ctx.setHasDesiredResources(() -> false); @@ -515,7 +518,7 @@ public class WaitingForResourcesTest extends TestLogger { } @Override - public void goToCreatingExecutionGraph() { + public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { creatingExecutionGraphStateValidator.validateInput(null); hasStateTransition = true; }
