zentol commented on a change in pull request #18761:
URL: https://github.com/apache/flink/pull/18761#discussion_r805956575
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -205,6 +218,14 @@ AssignmentResult tryToAssignSlots(
* @return the main thread executor
*/
ComponentMainThreadExecutor getMainThreadExecutor();
+
+ /**
+ * Get a {@link State} the {@link AdaptiveScheduler} is currently in.
This is primarily
+ * intended for callbacks that need to outlive a state transition.
+ *
+ * @return The current state of the {@link AdaptiveScheduler}.
+ */
+ State getEffectiveState();
Review comment:
Instead of exposing the state, why not expose the global failure handler?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -205,6 +218,14 @@ AssignmentResult tryToAssignSlots(
* @return the main thread executor
*/
ComponentMainThreadExecutor getMainThreadExecutor();
+
+ /**
+ * Get a {@link State} the {@link AdaptiveScheduler} is currently in.
This is primarily
+ * intended for callbacks that need to outlive a state transition.
+ *
+ * @return The current state of the {@link AdaptiveScheduler}.
+ */
+ State getEffectiveState();
Review comment:
Besides that, I don't know whether the added "Effective" gives us
anything.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
actualExecutionGraph ->
assertThat(actualExecutionGraph,
sameInstance(executionGraph)));
- executionGraphWithvertexParallelismFuture.complete(
+ final AtomicReference<Throwable> handledFailureRef = new
AtomicReference<>();
+ context.setCreateExecutingStateFunction(
+ () ->
+ new AdaptiveSchedulerTest.DummyState() {
+
+ @Override
+ public void handleGlobalFailure(Throwable
cause) {
+ handledFailureRef.set(cause);
+ }
+ });
+
+ executionGraphWithVertexParallelismFuture.complete(
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, new TestingVertexParallelism()));
+
+ final Throwable t = new RuntimeException("Test.");
+ errorHandlerRef.get().accept(t);
Review comment:
This should be a separate test case. It now contains more lines for the
error handling although that's not it's primary purpose.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
actualExecutionGraph ->
assertThat(actualExecutionGraph,
sameInstance(executionGraph)));
- executionGraphWithvertexParallelismFuture.complete(
+ final AtomicReference<Throwable> handledFailureRef = new
AtomicReference<>();
+ context.setCreateExecutingStateFunction(
+ () ->
+ new AdaptiveSchedulerTest.DummyState() {
+
+ @Override
+ public void handleGlobalFailure(Throwable
cause) {
+ handledFailureRef.set(cause);
+ }
+ });
+
+ executionGraphWithVertexParallelismFuture.complete(
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, new TestingVertexParallelism()));
+
+ final Throwable t = new RuntimeException("Test.");
+ errorHandlerRef.get().accept(t);
+ assertEquals(
+ "OperatorCoordinator should use failure handler of the
Executing state.",
+ t,
+ handledFailureRef.get());
Review comment:
This overall seems unsafe. There's no requirement that the error handler
for different states are actually different instances.
What you rather want to test is that a failure in the coordinator calls into
the scheduler.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -135,10 +139,16 @@ public void
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() thro
public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws
Exception {
try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
- executionGraphWithvertexParallelismFuture = new
CompletableFuture<>();
- final CreatingExecutionGraph creatingExecutionGraph =
- new CreatingExecutionGraph(
- context,
executionGraphWithvertexParallelismFuture, log);
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<Consumer<Throwable>> errorHandlerRef = new
AtomicReference<>();
+ new CreatingExecutionGraph(context,
executionGraphWithVertexParallelismFuture, log) {
+ @Override
+ OperatorCoordinatorHandler createOperatorCoordinatorHandler(
Review comment:
I'd rather add a OperatorCoordinatorHandler factory argument to the
CreatingExecutionGraph than resorting to this.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -149,9 +159,27 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
actualExecutionGraph ->
assertThat(actualExecutionGraph,
sameInstance(executionGraph)));
- executionGraphWithvertexParallelismFuture.complete(
+ final AtomicReference<Throwable> handledFailureRef = new
AtomicReference<>();
+ context.setCreateExecutingStateFunction(
+ () ->
+ new AdaptiveSchedulerTest.DummyState() {
+
+ @Override
+ public void handleGlobalFailure(Throwable
cause) {
+ handledFailureRef.set(cause);
Review comment:
This seems overly complicated; isn't it enough to know that it was
indeed called?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
}
}
+ @Test
+ public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState()
throws Exception {
+ try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
+ final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<GlobalFailureHandler>
operatorCoordinatorGlobalFailureHandlerRef =
+ new AtomicReference<>();
+ new CreatingExecutionGraph(
+ context,
+ executionGraphWithVertexParallelismFuture,
+ log,
+ (executionGraph, errorHandler) -> {
+
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+ return new TestingOperatorCoordinatorHandler();
+ });
+
+ final StateTrackingMockExecutionGraph executionGraph =
+ new StateTrackingMockExecutionGraph();
+
+
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+ context.setExpectedExecuting(
+ ignored -> {
+ // This just lets us transition to executing.
+ });
+
+ final AtomicBoolean contextGlobalFailureHandlerCalled = new
AtomicBoolean(false);
+ context.setGlobalFailureHandler(
Review comment:
Couldn't we just check that the errorHandler passed to the factory ==
context?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
}
}
+ @Test
+ public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState()
throws Exception {
+ try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
+ final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<GlobalFailureHandler>
operatorCoordinatorGlobalFailureHandlerRef =
+ new AtomicReference<>();
+ new CreatingExecutionGraph(
+ context,
+ executionGraphWithVertexParallelismFuture,
+ log,
+ (executionGraph, errorHandler) -> {
+
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+ return new TestingOperatorCoordinatorHandler();
+ });
+
+ final StateTrackingMockExecutionGraph executionGraph =
+ new StateTrackingMockExecutionGraph();
+
+
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+ context.setExpectedExecuting(
+ ignored -> {
+ // This just lets us transition to executing.
+ });
+
+ final AtomicBoolean contextGlobalFailureHandlerCalled = new
AtomicBoolean(false);
+ context.setGlobalFailureHandler(
+ t -> {
+ contextGlobalFailureHandlerCalled.set(true);
+ });
+
+ executionGraphWithVertexParallelismFuture.complete(
+
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+ executionGraph, new TestingVertexParallelism()));
+
+ operatorCoordinatorGlobalFailureHandlerRef
+ .get()
+ .handleGlobalFailure(new RuntimeException("Test."));
+ assertTrue(contextGlobalFailureHandlerCalled.get());
Review comment:
This is what I meant in
https://github.com/apache/flink/pull/18761#discussion_r806234796.
```suggestion
executionGraphWithVertexParallelismFuture.complete(
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, new TestingVertexParallelism()));
assertThat(operatorCoordinatorGlobalFailureHandlerRef).isSameAs(context);
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -155,6 +179,60 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
}
}
+ @Test
+ public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState()
throws Exception {
+ try (MockCreatingExecutionGraphContext context = new
MockCreatingExecutionGraphContext()) {
+ final
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+ executionGraphWithVertexParallelismFuture = new
CompletableFuture<>();
+ final AtomicReference<GlobalFailureHandler>
operatorCoordinatorGlobalFailureHandlerRef =
+ new AtomicReference<>();
+ new CreatingExecutionGraph(
+ context,
+ executionGraphWithVertexParallelismFuture,
+ log,
+ (executionGraph, errorHandler) -> {
+
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+ return new TestingOperatorCoordinatorHandler();
+ });
+
+ final StateTrackingMockExecutionGraph executionGraph =
+ new StateTrackingMockExecutionGraph();
+
+
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+ context.setExpectedExecuting(
+ ignored -> {
+ // This just lets us transition to executing.
+ });
+
+ final AtomicBoolean contextGlobalFailureHandlerCalled = new
AtomicBoolean(false);
+ context.setGlobalFailureHandler(
+ t -> {
+ contextGlobalFailureHandlerCalled.set(true);
+ });
+
+ executionGraphWithVertexParallelismFuture.complete(
+
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+ executionGraph, new TestingVertexParallelism()));
+
+ operatorCoordinatorGlobalFailureHandlerRef
+ .get()
+ .handleGlobalFailure(new RuntimeException("Test."));
+ assertTrue(contextGlobalFailureHandlerCalled.get());
Review comment:
This is what I meant in
https://github.com/apache/flink/pull/18761#discussion_r806234796.
```suggestion
executionGraphWithVertexParallelismFuture.complete(
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, new TestingVertexParallelism()));
assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]