XComp commented on code in PR #26095:
URL: https://github.com/apache/flink/pull/26095#discussion_r1941511379
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1993,6 +2008,61 @@ public synchronized CompletableFuture<Void> closeAsync()
{
schedulerClosed.get();
}
+ // visible to expose test logic to other Scheduler test classes
+ public static void runCloseAsyncCompletesInMainThreadTest(
+ ScheduledExecutorService singleThreadExecutorService,
+ BiFunctionWithException<
+ ComponentMainThreadExecutor, CheckpointsCleaner,
SchedulerNG, Exception>
+ schedulerFactory)
+ throws Exception {
+ final OneShotLatch cleanerCloseLatch = new OneShotLatch();
+ final CompletableFuture<Void> cleanerCloseFuture = new
CompletableFuture<>();
+ final CheckpointsCleaner checkpointsCleaner =
+ new CheckpointsCleaner() {
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ cleanerCloseLatch.trigger();
+ return cleanerCloseFuture;
+ }
+ };
+
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+ final SchedulerNG scheduler =
+ schedulerFactory.apply(mainThreadExecutor, checkpointsCleaner);
+
+ mainThreadExecutor.execute(scheduler::startScheduling);
+
+ final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ mainThreadExecutor.execute(
+ () -> {
+ // we shouldn't block the closeAsync call here because
it's triggering
+ // additional tasks on the main thread internally
+ FutureUtils.forward(
+ scheduler
+ .closeAsync()
+ .thenRun(
+ () -> {
+
mainThreadExecutor.assertRunningInMainThread();
+ }),
+ closeFuture);
+ });
+
+ // wait for the CheckpointsCleaner#close call to not complete the
future prematurely
+ cleanerCloseLatch.await();
+
+ // there is a race condition between returning the future and
completing it which is due to
+ // the fact that we are triggering the latch before returning the
future. That gives a small
+ // chance that the future completion is executed too early causing the
future composition to
+ // end up in the main thread which is what we prevent in this test
+ Thread.sleep(50);
Review Comment:
Correct. Actually, this line makes sure that the test would
definitely/most-likely fail if the fix is not in place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]