zentol commented on code in PR #26095:
URL: https://github.com/apache/flink/pull/26095#discussion_r1941200128


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1993,6 +2008,61 @@ public synchronized CompletableFuture<Void> closeAsync() 
{
         schedulerClosed.get();
     }
 
+    // visible to expose test logic to other Scheduler test classes
+    public static void runCloseAsyncCompletesInMainThreadTest(
+            ScheduledExecutorService singleThreadExecutorService,
+            BiFunctionWithException<
+                            ComponentMainThreadExecutor, CheckpointsCleaner, 
SchedulerNG, Exception>
+                    schedulerFactory)
+            throws Exception {
+        final OneShotLatch cleanerCloseLatch = new OneShotLatch();
+        final CompletableFuture<Void> cleanerCloseFuture = new 
CompletableFuture<>();
+        final CheckpointsCleaner checkpointsCleaner =
+                new CheckpointsCleaner() {
+                    @Override
+                    public CompletableFuture<Void> closeAsync() {
+                        cleanerCloseLatch.trigger();
+                        return cleanerCloseFuture;
+                    }
+                };
+
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+        final SchedulerNG scheduler =
+                schedulerFactory.apply(mainThreadExecutor, checkpointsCleaner);
+
+        mainThreadExecutor.execute(scheduler::startScheduling);
+
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        mainThreadExecutor.execute(
+                () -> {
+                    // we shouldn't block the closeAsync call here because 
it's triggering
+                    // additional tasks on the main thread internally
+                    FutureUtils.forward(
+                            scheduler
+                                    .closeAsync()
+                                    .thenRun(
+                                            () -> {
+                                                
mainThreadExecutor.assertRunningInMainThread();
+                                            }),
+                            closeFuture);
+                });
+
+        // wait for the CheckpointsCleaner#close call to not complete the 
future prematurely
+        cleanerCloseLatch.await();
+
+        // there is a race condition between returning the future and 
completing it which is due to
+        // the fact that we are triggering the latch before returning the 
future. That gives a small
+        // chance that the future completion is executed too early causing the 
future composition to
+        // end up in the main thread which is what we prevent in this test
+        Thread.sleep(50);

Review Comment:
   This is just for increasing the odds that we test things at all, right? They 
test won't fail if we're unlucky



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to