zentol commented on a change in pull request #18761:
URL: https://github.com/apache/flink/pull/18761#discussion_r805956575



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -205,6 +218,14 @@ AssignmentResult tryToAssignSlots(
          * @return the main thread executor
          */
         ComponentMainThreadExecutor getMainThreadExecutor();
+
+        /**
+         * Get a {@link State} the {@link AdaptiveScheduler} is currently in. 
This is primarily
+         * intended for callbacks that need to outlive a state transition.
+         *
+         * @return The current state of the {@link AdaptiveScheduler}.
+         */
+        State getEffectiveState();

Review comment:
       Instead of exposing the state, why not expose the global failure handler?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -205,6 +218,14 @@ AssignmentResult tryToAssignSlots(
          * @return the main thread executor
          */
         ComponentMainThreadExecutor getMainThreadExecutor();
+
+        /**
+         * Get a {@link State} the {@link AdaptiveScheduler} is currently in. 
This is primarily
+         * intended for callbacks that need to outlive a state transition.
+         *
+         * @return The current state of the {@link AdaptiveScheduler}.
+         */
+        State getEffectiveState();

Review comment:
       Besides that, I don't know whether the added "Effective" gives us 
anything.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
                     actualExecutionGraph ->
                             assertThat(actualExecutionGraph, 
sameInstance(executionGraph)));
 
-            executionGraphWithvertexParallelismFuture.complete(
+            final AtomicReference<Throwable> handledFailureRef = new 
AtomicReference<>();
+            context.setCreateExecutingStateFunction(
+                    () ->
+                            new AdaptiveSchedulerTest.DummyState() {
+
+                                @Override
+                                public void handleGlobalFailure(Throwable 
cause) {
+                                    handledFailureRef.set(cause);
+                                }
+                            });
+
+            executionGraphWithVertexParallelismFuture.complete(
                     
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                             executionGraph, new TestingVertexParallelism()));
+
+            final Throwable t = new RuntimeException("Test.");
+            errorHandlerRef.get().accept(t);

Review comment:
       This should be a separate test case. It now contains more lines for the 
error handling although that's not it's primary purpose.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
                     actualExecutionGraph ->
                             assertThat(actualExecutionGraph, 
sameInstance(executionGraph)));
 
-            executionGraphWithvertexParallelismFuture.complete(
+            final AtomicReference<Throwable> handledFailureRef = new 
AtomicReference<>();
+            context.setCreateExecutingStateFunction(
+                    () ->
+                            new AdaptiveSchedulerTest.DummyState() {
+
+                                @Override
+                                public void handleGlobalFailure(Throwable 
cause) {
+                                    handledFailureRef.set(cause);
+                                }
+                            });
+
+            executionGraphWithVertexParallelismFuture.complete(
                     
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                             executionGraph, new TestingVertexParallelism()));
+
+            final Throwable t = new RuntimeException("Test.");
+            errorHandlerRef.get().accept(t);
+            assertEquals(
+                    "OperatorCoordinator should use failure handler of the 
Executing state.",
+                    t,
+                    handledFailureRef.get());

Review comment:
       This overall seems unsafe. There's no requirement that the error handler 
for different states are actually different instances.
   What you rather want to test is that a failure in the coordinator calls into 
the scheduler.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -135,10 +139,16 @@ public void 
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() thro
     public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws 
Exception {
         try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
             final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
-                    executionGraphWithvertexParallelismFuture = new 
CompletableFuture<>();
-            final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(
-                            context, 
executionGraphWithvertexParallelismFuture, log);
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+            final AtomicReference<Consumer<Throwable>> errorHandlerRef = new 
AtomicReference<>();
+            new CreatingExecutionGraph(context, 
executionGraphWithVertexParallelismFuture, log) {
+                @Override
+                OperatorCoordinatorHandler createOperatorCoordinatorHandler(

Review comment:
       I'd rather add a OperatorCoordinatorHandler factory argument to the 
CreatingExecutionGraph than resorting to this.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
                     actualExecutionGraph ->
                             assertThat(actualExecutionGraph, 
sameInstance(executionGraph)));
 
-            executionGraphWithvertexParallelismFuture.complete(
+            final AtomicReference<Throwable> handledFailureRef = new 
AtomicReference<>();
+            context.setCreateExecutingStateFunction(
+                    () ->
+                            new AdaptiveSchedulerTest.DummyState() {
+
+                                @Override
+                                public void handleGlobalFailure(Throwable 
cause) {
+                                    handledFailureRef.set(cause);

Review comment:
       This seems overly complicated; isn't it enough to know that it was 
indeed called?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
         }
     }
 
+    @Test
+    public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() 
throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+            final AtomicReference<GlobalFailureHandler> 
operatorCoordinatorGlobalFailureHandlerRef =
+                    new AtomicReference<>();
+            new CreatingExecutionGraph(
+                    context,
+                    executionGraphWithVertexParallelismFuture,
+                    log,
+                    (executionGraph, errorHandler) -> {
+                        
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+                        return new TestingOperatorCoordinatorHandler();
+                    });
+
+            final StateTrackingMockExecutionGraph executionGraph =
+                    new StateTrackingMockExecutionGraph();
+
+            
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+            context.setExpectedExecuting(
+                    ignored -> {
+                        // This just lets us transition to executing.
+                    });
+
+            final AtomicBoolean contextGlobalFailureHandlerCalled = new 
AtomicBoolean(false);
+            context.setGlobalFailureHandler(

Review comment:
       Couldn't we just check that the errorHandler passed to the factory == 
context?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
         }
     }
 
+    @Test
+    public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() 
throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+            final AtomicReference<GlobalFailureHandler> 
operatorCoordinatorGlobalFailureHandlerRef =
+                    new AtomicReference<>();
+            new CreatingExecutionGraph(
+                    context,
+                    executionGraphWithVertexParallelismFuture,
+                    log,
+                    (executionGraph, errorHandler) -> {
+                        
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+                        return new TestingOperatorCoordinatorHandler();
+                    });
+
+            final StateTrackingMockExecutionGraph executionGraph =
+                    new StateTrackingMockExecutionGraph();
+
+            
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+            context.setExpectedExecuting(
+                    ignored -> {
+                        // This just lets us transition to executing.
+                    });
+
+            final AtomicBoolean contextGlobalFailureHandlerCalled = new 
AtomicBoolean(false);
+            context.setGlobalFailureHandler(
+                    t -> {
+                        contextGlobalFailureHandlerCalled.set(true);
+                    });
+
+            executionGraphWithVertexParallelismFuture.complete(
+                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                            executionGraph, new TestingVertexParallelism()));
+
+            operatorCoordinatorGlobalFailureHandlerRef
+                    .get()
+                    .handleGlobalFailure(new RuntimeException("Test."));
+            assertTrue(contextGlobalFailureHandlerCalled.get());

Review comment:
       This is what I meant in 
https://github.com/apache/flink/pull/18761#discussion_r806234796.
   
   ```suggestion
               executionGraphWithVertexParallelismFuture.complete(
                       
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                               executionGraph, new TestingVertexParallelism()));
   
               
assertThat(operatorCoordinatorGlobalFailureHandlerRef).isSameAs(context);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void 
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
         }
     }
 
+    @Test
+    public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() 
throws Exception {
+        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
+            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+            final AtomicReference<GlobalFailureHandler> 
operatorCoordinatorGlobalFailureHandlerRef =
+                    new AtomicReference<>();
+            new CreatingExecutionGraph(
+                    context,
+                    executionGraphWithVertexParallelismFuture,
+                    log,
+                    (executionGraph, errorHandler) -> {
+                        
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+                        return new TestingOperatorCoordinatorHandler();
+                    });
+
+            final StateTrackingMockExecutionGraph executionGraph =
+                    new StateTrackingMockExecutionGraph();
+
+            
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+            context.setExpectedExecuting(
+                    ignored -> {
+                        // This just lets us transition to executing.
+                    });
+
+            final AtomicBoolean contextGlobalFailureHandlerCalled = new 
AtomicBoolean(false);
+            context.setGlobalFailureHandler(
+                    t -> {
+                        contextGlobalFailureHandlerCalled.set(true);
+                    });
+
+            executionGraphWithVertexParallelismFuture.complete(
+                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                            executionGraph, new TestingVertexParallelism()));
+
+            operatorCoordinatorGlobalFailureHandlerRef
+                    .get()
+                    .handleGlobalFailure(new RuntimeException("Test."));
+            assertTrue(contextGlobalFailureHandlerCalled.get());

Review comment:
       This is what I meant in 
https://github.com/apache/flink/pull/18761#discussion_r806234796.
   
   ```suggestion
               executionGraphWithVertexParallelismFuture.complete(
                       
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                               executionGraph, new TestingVertexParallelism()));
   
               
assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to