dmvk commented on a change in pull request #18761:
URL: https://github.com/apache/flink/pull/18761#discussion_r805959938



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -205,6 +218,14 @@ AssignmentResult tryToAssignSlots(
          * @return the main thread executor
          */
         ComponentMainThreadExecutor getMainThreadExecutor();
+
+        /**
+         * Get a {@link State} the {@link AdaptiveScheduler} is currently in. 
This is primarily
+         * intended for callbacks that need to outlive a state transition.
+         *
+         * @return The current state of the {@link AdaptiveScheduler}.
+         */
+        State getEffectiveState();

Review comment:
       I did that in the first iteration, but it seemed confusing that you 
actually have both `handleGlobalFailure` and `context.handleGlobalFailure` 
methods available in the class and it's not clear what the difference is. This 
felt bit more explicit for the reader.
   
   WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
                     actualExecutionGraph ->
                             assertThat(actualExecutionGraph, 
sameInstance(executionGraph)));
 
-            executionGraphWithvertexParallelismFuture.complete(
+            final AtomicReference<Throwable> handledFailureRef = new 
AtomicReference<>();
+            context.setCreateExecutingStateFunction(
+                    () ->
+                            new AdaptiveSchedulerTest.DummyState() {
+
+                                @Override
+                                public void handleGlobalFailure(Throwable 
cause) {
+                                    handledFailureRef.set(cause);
+                                }
+                            });
+
+            executionGraphWithVertexParallelismFuture.complete(
                     
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                             executionGraph, new TestingVertexParallelism()));
+
+            final Throwable t = new RuntimeException("Test.");
+            errorHandlerRef.get().accept(t);

Review comment:
       👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -135,10 +139,16 @@ public void 
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() thro
     public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws 
Exception {
         try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
             final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
-                    executionGraphWithvertexParallelismFuture = new 
CompletableFuture<>();
-            final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(
-                            context, 
executionGraphWithvertexParallelismFuture, log);
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+            final AtomicReference<Consumer<Throwable>> errorHandlerRef = new 
AtomicReference<>();
+            new CreatingExecutionGraph(context, 
executionGraphWithVertexParallelismFuture, log) {
+                @Override
+                OperatorCoordinatorHandler createOperatorCoordinatorHandler(

Review comment:
       👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
         }
     }
 
+    @Test
+    public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() 
throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+            final AtomicReference<GlobalFailureHandler> 
operatorCoordinatorGlobalFailureHandlerRef =
+                    new AtomicReference<>();
+            new CreatingExecutionGraph(
+                    context,
+                    executionGraphWithVertexParallelismFuture,
+                    log,
+                    (executionGraph, errorHandler) -> {
+                        
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+                        return new TestingOperatorCoordinatorHandler();
+                    });
+
+            final StateTrackingMockExecutionGraph executionGraph =
+                    new StateTrackingMockExecutionGraph();
+
+            
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+            context.setExpectedExecuting(
+                    ignored -> {
+                        // This just lets us transition to executing.
+                    });
+
+            final AtomicBoolean contextGlobalFailureHandlerCalled = new 
AtomicBoolean(false);
+            context.setGlobalFailureHandler(

Review comment:
       I'd rather avoid that, because we can't do an assertion in factory as 
it's executed in the future callback that is wrapped in 
`FutureUtils.assertNoException`. I don't really want to go into changing any 
more logic because of that.
   
   I can still make an assert on the reference, but that doesn't seem to make 
too much of a difference for readability.
   
   WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
         }
     }
 
+    @Test
+    public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() 
throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+            final AtomicReference<GlobalFailureHandler> 
operatorCoordinatorGlobalFailureHandlerRef =
+                    new AtomicReference<>();
+            new CreatingExecutionGraph(
+                    context,
+                    executionGraphWithVertexParallelismFuture,
+                    log,
+                    (executionGraph, errorHandler) -> {
+                        
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+                        return new TestingOperatorCoordinatorHandler();
+                    });
+
+            final StateTrackingMockExecutionGraph executionGraph =
+                    new StateTrackingMockExecutionGraph();
+
+            
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+            context.setExpectedExecuting(
+                    ignored -> {
+                        // This just lets us transition to executing.
+                    });
+
+            final AtomicBoolean contextGlobalFailureHandlerCalled = new 
AtomicBoolean(false);
+            context.setGlobalFailureHandler(
+                    t -> {
+                        contextGlobalFailureHandlerCalled.set(true);
+                    });
+
+            executionGraphWithVertexParallelismFuture.complete(
+                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                            executionGraph, new TestingVertexParallelism()));
+
+            operatorCoordinatorGlobalFailureHandlerRef
+                    .get()
+                    .handleGlobalFailure(new RuntimeException("Test."));
+            assertTrue(contextGlobalFailureHandlerCalled.get());

Review comment:
       👍 will change that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to