zentol commented on a change in pull request #18761:
URL: https://github.com/apache/flink/pull/18761#discussion_r806819042
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
}
}
+ @Test
+ public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState()
throws Exception {
+ try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
+ final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<GlobalFailureHandler>
operatorCoordinatorGlobalFailureHandlerRef =
+ new AtomicReference<>();
+ new CreatingExecutionGraph(
+ context,
+ executionGraphWithVertexParallelismFuture,
+ log,
+ (executionGraph, errorHandler) -> {
+
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+ return new TestingOperatorCoordinatorHandler();
+ });
+
+ final StateTrackingMockExecutionGraph executionGraph =
+ new StateTrackingMockExecutionGraph();
+
+
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+ context.setExpectedExecuting(
+ ignored -> {
+ // This just lets us transition to executing.
+ });
+
+ final AtomicBoolean contextGlobalFailureHandlerCalled = new
AtomicBoolean(false);
+ context.setGlobalFailureHandler(
+ t -> {
+ contextGlobalFailureHandlerCalled.set(true);
+ });
+
+ executionGraphWithVertexParallelismFuture.complete(
+
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+ executionGraph, new TestingVertexParallelism()));
+
+ operatorCoordinatorGlobalFailureHandlerRef
+ .get()
+ .handleGlobalFailure(new RuntimeException("Test."));
+ assertTrue(contextGlobalFailureHandlerCalled.get());
Review comment:
This is what I meant in
https://github.com/apache/flink/pull/18761#discussion_r806234796.
```suggestion
executionGraphWithVertexParallelismFuture.complete(
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, new TestingVertexParallelism()));
assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]