tillrohrmann commented on a change in pull request #14879:
URL: https://github.com/apache/flink/pull/14879#discussion_r573705164



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredComponentMainThreadExecutor.java
##########
@@ -22,15 +22,11 @@
 public class ManuallyTriggeredComponentMainThreadExecutor
         extends ManuallyTriggeredScheduledExecutorService implements 
ComponentMainThreadExecutor {
 
-    private Thread executorThread = null;
+    private final Thread executorThread = Thread.currentThread();

Review comment:
       Hmm I was more thinking about making this configurable for the 
instantiator of this class. That way it would be obvious what a user of this 
class can expect from this class.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
##########
@@ -155,11 +155,53 @@ public void 
testTransitionToFinishedOnFailedExecutionGraph() throws Exception {
         }
     }
 
+    @Test
+    public void testEnsureOnlyGlobalTerminalStateCallbacksNoCall() throws 
Exception {
+        // modify MockExecutingContext to disable the "runIfState()" check
+        MockExecutingContext ctx =
+                new MockExecutingContext() {
+                    @Override
+                    public void runIfState(State expectedState, Runnable 
action) {
+                        action.run();
+                    }
+                };

Review comment:
       This looks a bit like a hack. Maybe it is because we should have a more 
targeted test for `StateWithExecutionGraph` where we test when 
`onGloballyTerminalState` is called.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
##########
@@ -155,11 +155,53 @@ public void 
testTransitionToFinishedOnFailedExecutionGraph() throws Exception {
         }
     }
 
+    @Test
+    public void testEnsureOnlyGlobalTerminalStateCallbacksNoCall() throws 
Exception {
+        // modify MockExecutingContext to disable the "runIfState()" check
+        MockExecutingContext ctx =
+                new MockExecutingContext() {
+                    @Override
+                    public void runIfState(State expectedState, Runnable 
action) {
+                        action.run();
+                    }
+                };
+        MockedExecuting exec = new ExecutingStateBuilder().build(ctx);
+
+        ctx.setExpectFinished(
+                archivedExecutionGraph ->
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED)));
+        exec.onEnter();
+        exec.suspend(
+                new RuntimeException("suspend")); // trigger transition to 
non-global terminal state
+        ctx.close(); // manually close context to trigger callbacks
+
+        assertThat(exec.isOnGloballyTerminalStateCalled(), is(false));
+    }
+
+    @Test
+    public void testEnsureOnlyGlobalTerminalStateCallbacksWithCall() throws 
Exception {
+        // modify MockExecutingContext to disable the "runIfState()" check
+        MockExecutingContext ctx =
+                new MockExecutingContext() {
+                    @Override
+                    public void runIfState(State expectedState, Runnable 
action) {
+                        action.run();
+                    }
+                };
+        MockedExecuting exec = new ExecutingStateBuilder().build(ctx);
+        ctx.setExpectFinished(
+                archivedExecutionGraph ->
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED)));
+        exec.getExecutionGraph()
+                .failJob(new RuntimeException()); // trigger transition to 
global terminal state
+        ctx.close(); // manually close context to trigger callbacks
+        assertThat(exec.isOnGloballyTerminalStateCalled(), is(true));
+    }

Review comment:
       Technically speaking, these tests belong rather to a 
`StateWithExecutionGraphTest` which tests the general behaviour of the 
`StateWithExecutionGraph`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
##########
@@ -308,7 +309,7 @@ abstract boolean updateTaskExecutionState(
      *
      * @param terminalState terminalState which the execution graph reached
      */
-    abstract void onTerminalState(JobStatus terminalState);
+    abstract void onGloballyTerminalState(JobStatus terminalState);

Review comment:
       We should update the JavaDocs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to