dmvk commented on a change in pull request #18761:
URL: https://github.com/apache/flink/pull/18761#discussion_r805959938
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -205,6 +218,14 @@ AssignmentResult tryToAssignSlots(
* @return the main thread executor
*/
ComponentMainThreadExecutor getMainThreadExecutor();
+
+ /**
+ * Get a {@link State} the {@link AdaptiveScheduler} is currently in.
This is primarily
+ * intended for callbacks that need to outlive a state transition.
+ *
+ * @return The current state of the {@link AdaptiveScheduler}.
+ */
+ State getEffectiveState();
Review comment:
I did that in the first iteration, but it seemed confusing that you
actually have both `handleGlobalFailure` and `context.handleGlobalFailure`
methods available in the class and it's not clear what the difference is. This
felt bit more explicit for the reader.
WDYT?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
actualExecutionGraph ->
assertThat(actualExecutionGraph,
sameInstance(executionGraph)));
- executionGraphWithvertexParallelismFuture.complete(
+ final AtomicReference<Throwable> handledFailureRef = new
AtomicReference<>();
+ context.setCreateExecutingStateFunction(
+ () ->
+ new AdaptiveSchedulerTest.DummyState() {
+
+ @Override
+ public void handleGlobalFailure(Throwable
cause) {
+ handledFailureRef.set(cause);
+ }
+ });
+
+ executionGraphWithVertexParallelismFuture.complete(
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, new TestingVertexParallelism()));
+
+ final Throwable t = new RuntimeException("Test.");
+ errorHandlerRef.get().accept(t);
Review comment:
👍
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -135,10 +139,16 @@ public void
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() thro
public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws
Exception {
try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
- executionGraphWithvertexParallelismFuture = new
CompletableFuture<>();
- final CreatingExecutionGraph creatingExecutionGraph =
- new CreatingExecutionGraph(
- context,
executionGraphWithvertexParallelismFuture, log);
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<Consumer<Throwable>> errorHandlerRef = new
AtomicReference<>();
+ new CreatingExecutionGraph(context,
executionGraphWithVertexParallelismFuture, log) {
+ @Override
+ OperatorCoordinatorHandler createOperatorCoordinatorHandler(
Review comment:
👍
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
}
}
+ @Test
+ public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState()
throws Exception {
+ try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
+ final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<GlobalFailureHandler>
operatorCoordinatorGlobalFailureHandlerRef =
+ new AtomicReference<>();
+ new CreatingExecutionGraph(
+ context,
+ executionGraphWithVertexParallelismFuture,
+ log,
+ (executionGraph, errorHandler) -> {
+
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+ return new TestingOperatorCoordinatorHandler();
+ });
+
+ final StateTrackingMockExecutionGraph executionGraph =
+ new StateTrackingMockExecutionGraph();
+
+
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+ context.setExpectedExecuting(
+ ignored -> {
+ // This just lets us transition to executing.
+ });
+
+ final AtomicBoolean contextGlobalFailureHandlerCalled = new
AtomicBoolean(false);
+ context.setGlobalFailureHandler(
Review comment:
I'd rather avoid that, because we can't do an assertion in factory as
it's executed in the future callback that is wrapped in
`FutureUtils.assertNoException`. I don't really want to go into changing any
more logic because of that.
I can still make an assert on the reference, but that doesn't seem to make
too much of a difference for readability.
WDYT?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
}
}
+ @Test
+ public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState()
throws Exception {
+ try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
+ final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<GlobalFailureHandler>
operatorCoordinatorGlobalFailureHandlerRef =
+ new AtomicReference<>();
+ new CreatingExecutionGraph(
+ context,
+ executionGraphWithVertexParallelismFuture,
+ log,
+ (executionGraph, errorHandler) -> {
+
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+ return new TestingOperatorCoordinatorHandler();
+ });
+
+ final StateTrackingMockExecutionGraph executionGraph =
+ new StateTrackingMockExecutionGraph();
+
+
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+ context.setExpectedExecuting(
+ ignored -> {
+ // This just lets us transition to executing.
+ });
+
+ final AtomicBoolean contextGlobalFailureHandlerCalled = new
AtomicBoolean(false);
+ context.setGlobalFailureHandler(
+ t -> {
+ contextGlobalFailureHandlerCalled.set(true);
+ });
+
+ executionGraphWithVertexParallelismFuture.complete(
+
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+ executionGraph, new TestingVertexParallelism()));
+
+ operatorCoordinatorGlobalFailureHandlerRef
+ .get()
+ .handleGlobalFailure(new RuntimeException("Test."));
+ assertTrue(contextGlobalFailureHandlerCalled.get());
Review comment:
👍 will change that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]