This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 676e8e5528f99eb8ba5747f7489b0f02ee025dd6
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Feb 20 23:08:47 2023 +0000

    [FLINK-21450][runtime] Add previous ExecutionGraph to WaitingForResources 
AdaptiveScheduler state
    
    Previous ExecutionGraph will be used in a subsequent commit to allocate
    workloads more optimally by taking previous allocations into account.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 12 ++++++----
 .../flink/runtime/scheduler/adaptive/Created.java  |  2 +-
 .../scheduler/adaptive/CreatingExecutionGraph.java | 17 +++++++++----
 .../runtime/scheduler/adaptive/Restarting.java     |  5 +++-
 .../scheduler/adaptive/StateTransitions.java       |  6 +++--
 .../scheduler/adaptive/WaitingForResources.java    | 28 +++++++++++++++++-----
 .../runtime/scheduler/adaptive/CreatedTest.java    |  3 ++-
 .../adaptive/CreatingExecutionGraphTest.java       | 23 +++++++++++-------
 .../runtime/scheduler/adaptive/RestartingTest.java |  2 +-
 .../adaptive/WaitingForResourcesTest.java          |  9 ++++---
 10 files changed, 76 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 4f7b559e86a..0bdbb44da7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -784,7 +784,7 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public void goToWaitingForResources() {
+    public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
         final ResourceCounter desiredResources = calculateDesiredResources();
         declarativeSlotPool.setResourceRequirements(desiredResources);
 
@@ -794,7 +794,8 @@ public class AdaptiveScheduler
                         LOG,
                         desiredResources,
                         this.initialResourceAllocationTimeout,
-                        this.resourceStabilizationTimeout));
+                        this.resourceStabilizationTimeout,
+                        previousExecutionGraph));
     }
 
     private ResourceCounter calculateDesiredResources() {
@@ -916,14 +917,17 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public void goToCreatingExecutionGraph() {
+    public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
         final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
                 executionGraphWithAvailableResourcesFuture =
                         createExecutionGraphWithAvailableResourcesAsync();
 
         transitionToState(
                 new CreatingExecutionGraph.Factory(
-                        this, executionGraphWithAvailableResourcesFuture, 
LOG));
+                        this,
+                        executionGraphWithAvailableResourcesFuture,
+                        LOG,
+                        previousExecutionGraph));
     }
 
     private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
index 1dd9f1e8f16..798eeda352e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
@@ -69,7 +69,7 @@ class Created implements State {
 
     /** Starts the scheduling by going into the {@link WaitingForResources} 
state. */
     void startScheduling() {
-        context.goToWaitingForResources();
+        context.goToWaitingForResources(null);
     }
 
     /** Context of the {@link Created} state. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index 0ec7437bb85..c87e58b2971 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -61,12 +61,15 @@ public class CreatingExecutionGraph implements State {
     private final Logger logger;
     private final OperatorCoordinatorHandlerFactory 
operatorCoordinatorHandlerFactory;
 
+    private final @Nullable ExecutionGraph previousExecutionGraph;
+
     public CreatingExecutionGraph(
             Context context,
             CompletableFuture<ExecutionGraphWithVertexParallelism>
                     executionGraphWithParallelismFuture,
             Logger logger,
-            OperatorCoordinatorHandlerFactory operatorCoordinatorFactory) {
+            OperatorCoordinatorHandlerFactory operatorCoordinatorFactory,
+            ExecutionGraph previousExecutionGraph1) {
         this.context = context;
         this.logger = logger;
         this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory;
@@ -82,6 +85,7 @@ public class CreatingExecutionGraph implements State {
                                     Duration.ZERO);
                             return null;
                         }));
+        previousExecutionGraph = previousExecutionGraph1;
     }
 
     private void handleExecutionGraphCreation(
@@ -141,7 +145,7 @@ public class CreatingExecutionGraph implements State {
             } else {
                 logger.debug(
                         "Failed to reserve and assign the required slots. 
Waiting for new resources.");
-                context.goToWaitingForResources();
+                context.goToWaitingForResources(previousExecutionGraph);
             }
         }
     }
@@ -300,16 +304,20 @@ public class CreatingExecutionGraph implements State {
         private final CompletableFuture<ExecutionGraphWithVertexParallelism>
                 executionGraphWithParallelismFuture;
 
+        private final @Nullable ExecutionGraph previousExecutionGraph;
+
         private final Logger log;
 
         Factory(
                 Context context,
                 CompletableFuture<ExecutionGraphWithVertexParallelism>
                         executionGraphWithParallelismFuture,
-                Logger log) {
+                Logger log,
+                @Nullable ExecutionGraph previousExecutionGraph) {
             this.context = context;
             this.executionGraphWithParallelismFuture = 
executionGraphWithParallelismFuture;
             this.log = log;
+            this.previousExecutionGraph = previousExecutionGraph;
         }
 
         @Override
@@ -323,7 +331,8 @@ public class CreatingExecutionGraph implements State {
                     context,
                     executionGraphWithParallelismFuture,
                     log,
-                    DefaultOperatorCoordinatorHandler::new);
+                    DefaultOperatorCoordinatorHandler::new,
+                    previousExecutionGraph);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index 125804959dc..f0a38035f44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -97,7 +97,10 @@ class Restarting extends StateWithExecutionGraph {
     void onGloballyTerminalState(JobStatus globallyTerminalState) {
         Preconditions.checkArgument(globallyTerminalState == 
JobStatus.CANCELED);
         goToWaitingForResourcesFuture =
-                context.runIfState(this, context::goToWaitingForResources, 
backoffTime);
+                context.runIfState(
+                        this,
+                        () -> 
context.goToWaitingForResources(getExecutionGraph()),
+                        backoffTime);
     }
 
     /** Context of the {@link Restarting} state. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
index bd3f8a509c4..f6881be4dde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -58,7 +60,7 @@ public interface StateTransitions {
     interface ToCreatingExecutionGraph extends StateTransitions {
 
         /** Transitions into the {@link CreatingExecutionGraph} state. */
-        void goToCreatingExecutionGraph();
+        void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph);
     }
 
     /** Interface covering transition to the {@link Executing} state. */
@@ -164,6 +166,6 @@ public interface StateTransitions {
     interface ToWaitingForResources extends StateTransitions {
 
         /** Transitions into the {@link WaitingForResources} state. */
-        void goToWaitingForResources();
+        void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index 7081bb8980f..f102bcfc4ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.clock.Clock;
@@ -53,6 +54,9 @@ class WaitingForResources implements State, ResourceConsumer {
 
     @Nullable private ScheduledFuture<?> resourceTimeoutFuture;
 
+    @Nullable private final ExecutionGraph previousExecutionGraph;
+
+    @VisibleForTesting
     WaitingForResources(
             Context context,
             Logger log,
@@ -65,17 +69,18 @@ class WaitingForResources implements State, 
ResourceConsumer {
                 desiredResources,
                 initialResourceAllocationTimeout,
                 resourceStabilizationTimeout,
-                SystemClock.getInstance());
+                SystemClock.getInstance(),
+                null);
     }
 
-    @VisibleForTesting
     WaitingForResources(
             Context context,
             Logger log,
             ResourceCounter desiredResources,
             Duration initialResourceAllocationTimeout,
             Duration resourceStabilizationTimeout,
-            Clock clock) {
+            Clock clock,
+            @Nullable ExecutionGraph previousExecutionGraph) {
         this.context = Preconditions.checkNotNull(context);
         this.log = Preconditions.checkNotNull(log);
         this.desiredResources = Preconditions.checkNotNull(desiredResources);
@@ -97,6 +102,7 @@ class WaitingForResources implements State, ResourceConsumer 
{
                     context.runIfState(
                             this, this::resourceTimeout, 
initialResourceAllocationTimeout);
         }
+        this.previousExecutionGraph = previousExecutionGraph;
         context.runIfState(this, this::notifyNewResourcesAvailable, 
Duration.ZERO);
     }
 
@@ -175,7 +181,7 @@ class WaitingForResources implements State, 
ResourceConsumer {
     }
 
     private void createExecutionGraphWithAvailableResources() {
-        context.goToCreatingExecutionGraph();
+        context.goToCreatingExecutionGraph(previousExecutionGraph);
     }
 
     /** Context of the {@link WaitingForResources} state. */
@@ -227,18 +233,21 @@ class WaitingForResources implements State, 
ResourceConsumer {
         private final ResourceCounter desiredResources;
         private final Duration initialResourceAllocationTimeout;
         private final Duration resourceStabilizationTimeout;
+        @Nullable private final ExecutionGraph previousExecutionGraph;
 
         public Factory(
                 Context context,
                 Logger log,
                 ResourceCounter desiredResources,
                 Duration initialResourceAllocationTimeout,
-                Duration resourceStabilizationTimeout) {
+                Duration resourceStabilizationTimeout,
+                ExecutionGraph previousExecutionGraph) {
             this.context = context;
             this.log = log;
             this.desiredResources = desiredResources;
             this.initialResourceAllocationTimeout = 
initialResourceAllocationTimeout;
             this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+            this.previousExecutionGraph = previousExecutionGraph;
         }
 
         public Class<WaitingForResources> getStateClass() {
@@ -251,7 +260,14 @@ class WaitingForResources implements State, 
ResourceConsumer {
                     log,
                     desiredResources,
                     initialResourceAllocationTimeout,
-                    resourceStabilizationTimeout);
+                    resourceStabilizationTimeout,
+                    SystemClock.getInstance(),
+                    previousExecutionGraph);
         }
     }
+
+    @Nullable
+    public ExecutionGraph getPreviousExecutionGraph() {
+        return previousExecutionGraph;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
index 6887308f01b..e63513f16df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -122,7 +123,7 @@ public class CreatedTest extends TestLogger {
         }
 
         @Override
-        public void goToWaitingForResources() {
+        public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
             waitingForResourcesStateValidator.validateInput(null);
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index aa92d1dc089..85a4fe4f3ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -67,7 +67,8 @@ public class CreatingExecutionGraphTest extends TestLogger {
                             context,
                             new CompletableFuture<>(),
                             log,
-                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler);
+                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                            null);
 
             context.setExpectFinished(
                     archivedExecutionGraph ->
@@ -86,7 +87,8 @@ public class CreatingExecutionGraphTest extends TestLogger {
                             context,
                             new CompletableFuture<>(),
                             log,
-                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler);
+                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                            null);
 
             context.setExpectFinished(
                     archivedExecutionGraph ->
@@ -105,7 +107,8 @@ public class CreatingExecutionGraphTest extends TestLogger {
                             context,
                             new CompletableFuture<>(),
                             log,
-                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler);
+                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                            null);
 
             context.setExpectFinished(
                     archivedExecutionGraph ->
@@ -125,7 +128,8 @@ public class CreatingExecutionGraphTest extends TestLogger {
                     context,
                     executionGraphWithVertexParallelismFuture,
                     log,
-                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler);
+                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                    null);
 
             context.setExpectFinished(
                     archivedExecutionGraph ->
@@ -146,7 +150,8 @@ public class CreatingExecutionGraphTest extends TestLogger {
                     context,
                     executionGraphWithVertexParallelismFuture,
                     log,
-                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler);
+                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                    null);
 
             context.setTryToAssignSlotsFunction(
                     ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
@@ -167,7 +172,8 @@ public class CreatingExecutionGraphTest extends TestLogger {
                     context,
                     executionGraphWithVertexParallelismFuture,
                     log,
-                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler);
+                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                    null);
 
             final StateTrackingMockExecutionGraph executionGraph =
                     new StateTrackingMockExecutionGraph();
@@ -197,7 +203,8 @@ public class CreatingExecutionGraphTest extends TestLogger {
                     (executionGraph, errorHandler) -> {
                         
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
                         return new TestingOperatorCoordinatorHandler();
-                    });
+                    },
+                    null);
 
             final StateTrackingMockExecutionGraph executionGraph =
                     new StateTrackingMockExecutionGraph();
@@ -318,7 +325,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
         }
 
         @Override
-        public void goToWaitingForResources() {
+        public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
             waitingForResourcesStateValidator.validateInput(null);
             hadStateTransitionHappened = true;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index 94e7b75992d..264cb221f70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -174,7 +174,7 @@ public class RestartingTest extends TestLogger {
         public void archiveFailure(RootExceptionHistoryEntry failure) {}
 
         @Override
-        public void goToWaitingForResources() {
+        public void goToWaitingForResources(ExecutionGraph 
previousExecutionGraph) {
             waitingForResourcesStateValidator.validateInput(null);
             hadStateTransition = true;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index 504a29e65ec..2c58f053b13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.ScheduledTask;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.TestLogger;
@@ -157,7 +158,8 @@ public class WaitingForResourcesTest extends TestLogger {
                             RESOURCE_COUNTER,
                             initialResourceTimeout,
                             stabilizationTimeout,
-                            ctx.getClock());
+                            ctx.getClock(),
+                            null);
             // sufficient resources available
             ctx.setHasDesiredResources(() -> false);
             ctx.setHasSufficientResources(() -> true);
@@ -191,7 +193,8 @@ public class WaitingForResourcesTest extends TestLogger {
                             RESOURCE_COUNTER,
                             initialResourceTimeout,
                             stabilizationTimeout,
-                            ctx.getClock());
+                            ctx.getClock(),
+                            null);
 
             ctx.setHasDesiredResources(() -> false);
 
@@ -515,7 +518,7 @@ public class WaitingForResourcesTest extends TestLogger {
         }
 
         @Override
-        public void goToCreatingExecutionGraph() {
+        public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
             creatingExecutionGraphStateValidator.validateInput(null);
             hasStateTransition = true;
         }

Reply via email to