tillrohrmann commented on a change in pull request #14879:
URL: https://github.com/apache/flink/pull/14879#discussion_r573705164
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredComponentMainThreadExecutor.java
##########
@@ -22,15 +22,11 @@
public class ManuallyTriggeredComponentMainThreadExecutor
extends ManuallyTriggeredScheduledExecutorService implements
ComponentMainThreadExecutor {
- private Thread executorThread = null;
+ private final Thread executorThread = Thread.currentThread();
Review comment:
Hmm I was more thinking about making this configurable for the
instantiator of this class. That way it would be obvious what a user of this
class can expect from this class.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
##########
@@ -155,11 +155,53 @@ public void
testTransitionToFinishedOnFailedExecutionGraph() throws Exception {
}
}
+ @Test
+ public void testEnsureOnlyGlobalTerminalStateCallbacksNoCall() throws
Exception {
+ // modify MockExecutingContext to disable the "runIfState()" check
+ MockExecutingContext ctx =
+ new MockExecutingContext() {
+ @Override
+ public void runIfState(State expectedState, Runnable
action) {
+ action.run();
+ }
+ };
Review comment:
This looks a bit like a hack. Maybe it is because we should have a more
targeted test for `StateWithExecutionGraph` where we test when
`onGloballyTerminalState` is called.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
##########
@@ -155,11 +155,53 @@ public void
testTransitionToFinishedOnFailedExecutionGraph() throws Exception {
}
}
+ @Test
+ public void testEnsureOnlyGlobalTerminalStateCallbacksNoCall() throws
Exception {
+ // modify MockExecutingContext to disable the "runIfState()" check
+ MockExecutingContext ctx =
+ new MockExecutingContext() {
+ @Override
+ public void runIfState(State expectedState, Runnable
action) {
+ action.run();
+ }
+ };
+ MockedExecuting exec = new ExecutingStateBuilder().build(ctx);
+
+ ctx.setExpectFinished(
+ archivedExecutionGraph ->
+ assertThat(archivedExecutionGraph.getState(),
is(JobStatus.SUSPENDED)));
+ exec.onEnter();
+ exec.suspend(
+ new RuntimeException("suspend")); // trigger transition to
non-global terminal state
+ ctx.close(); // manually close context to trigger callbacks
+
+ assertThat(exec.isOnGloballyTerminalStateCalled(), is(false));
+ }
+
+ @Test
+ public void testEnsureOnlyGlobalTerminalStateCallbacksWithCall() throws
Exception {
+ // modify MockExecutingContext to disable the "runIfState()" check
+ MockExecutingContext ctx =
+ new MockExecutingContext() {
+ @Override
+ public void runIfState(State expectedState, Runnable
action) {
+ action.run();
+ }
+ };
+ MockedExecuting exec = new ExecutingStateBuilder().build(ctx);
+ ctx.setExpectFinished(
+ archivedExecutionGraph ->
+ assertThat(archivedExecutionGraph.getState(),
is(JobStatus.FAILED)));
+ exec.getExecutionGraph()
+ .failJob(new RuntimeException()); // trigger transition to
global terminal state
+ ctx.close(); // manually close context to trigger callbacks
+ assertThat(exec.isOnGloballyTerminalStateCalled(), is(true));
+ }
Review comment:
Technically speaking, these tests belong rather to a
`StateWithExecutionGraphTest` which tests the general behaviour of the
`StateWithExecutionGraph`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
##########
@@ -308,7 +309,7 @@ abstract boolean updateTaskExecutionState(
*
* @param terminalState terminalState which the execution graph reached
*/
- abstract void onTerminalState(JobStatus terminalState);
+ abstract void onGloballyTerminalState(JobStatus terminalState);
Review comment:
We should update the JavaDocs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]