echauchot commented on a change in pull request #18610:
URL: https://github.com/apache/flink/pull/18610#discussion_r805726109



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
##########
@@ -155,6 +156,41 @@ private void testAssignSplitToUnregisterdReader(boolean 
fromCoordinatorExecutor)
                 "Cannot assign splits");
     }
 
+    @Test
+    public void testExceptionInRunnableFailsTheJob() throws 
InterruptedException {
+        ManuallyTriggeredScheduledExecutorService manualWorkerExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+        // do not use ManuallyTriggeredScheduledExecutorService for 
coordinatorExecutor
+        // as it needs to use factory.newThread() so that the 
FailJobExceptionHandler is set
+        // use a custom ThreadPool to gain access to the underlying thread
+        // to wait for it before asserting
+        final ThreadAccessibleThreadPool coordinatorExecutor =
+                new ThreadAccessibleThreadPool(coordinatorThreadFactory);
+        SourceCoordinatorContext<MockSourceSplit> testingContext =
+                new SourceCoordinatorContext<>(
+                        coordinatorExecutor,
+                        manualWorkerExecutor,
+                        coordinatorThreadFactory,
+                        operatorCoordinatorContext,
+                        new MockSourceSplitSerializer(),
+                        splitSplitAssignmentTracker);
+
+        testingContext.runInCoordinatorThread(
+                () -> {
+                    throw new RuntimeException();
+                });
+
+        manualWorkerExecutor.triggerAll();
+        // testingContext.close(); already shutdowns coordinatorExecutor and 
blocks until tasks are
+        // finished
+        testingContext.close();
+        // wait that the thread handles the exception through the provided 
exception handler.
+        // Using ThreadPoolExecutor#awaitTermination is not enough to wait for 
exception handling.
+        // It just waits for the runnable
+        coordinatorExecutor.getSingleThread().join();

Review comment:
       I agree it is simpler and higher level. I did not do that because I did 
not want to change the signature of the existing test components but fair 
enough, I'll do that. Thanks for the suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to