This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cff1da89cbd4f35b12c61a3e109db4a55682e44
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Aug 1 11:51:02 2024 +0200

    [FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to execute proper 
lifecycle management (i.e. closing the scheduler before shutting down the main 
thread executor)
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 622 +++++++++++----------
 1 file changed, 336 insertions(+), 286 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 70babe57449..ab0b96fa765 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -119,6 +119,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -188,12 +190,55 @@ public class AdaptiveSchedulerTest {
 
     private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
 
+    private AdaptiveScheduler scheduler;
+
+    @BeforeEach
+    void before() {
+        scheduler = null;
+    }
+
+    @AfterEach
+    void after() {
+        closeInExecutorService(scheduler, singleThreadMainThreadExecutor);
+    }
+
+    private static void closeInExecutorService(
+            @Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor 
executor) {
+        if (scheduler != null) {
+            final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+            executor.execute(
+                    () -> {
+                        try {
+                            // no matter what state the scheduler is in; we 
have to go to Finished
+                            // state to please the Preconditions of the close 
call
+                            if (scheduler.getState().getClass() != 
Finished.class) {
+                                scheduler.goToFinished(
+                                        scheduler.getArchivedExecutionGraph(
+                                                JobStatus.CANCELED, null));
+                            }
+                            FutureUtils.forward(scheduler.closeAsync(), 
closeFuture);
+                        } catch (Throwable t) {
+                            closeFuture.completeExceptionally(t);
+                        }
+                    });
+            assertThatFuture(closeFuture).eventuallySucceeds();
+        }
+    }
+
+    private void startTestInstanceInMainThread() {
+        runInMainThread(() -> scheduler.startScheduling());
+    }
+
+    private void runInMainThread(Runnable callback) {
+        CompletableFuture.runAsync(callback, 
singleThreadMainThreadExecutor).join();
+    }
+
     @Test
     void testInitialState() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -206,12 +251,14 @@ public class AdaptiveSchedulerTest {
         jobGraph.setSnapshotSettings(
                 new JobCheckpointingSettings(
                         CheckpointCoordinatorConfiguration.builder().build(), 
null));
-
-        final ArchivedExecutionGraph archivedExecutionGraph =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                        .build()
-                        .getArchivedExecutionGraph(JobStatus.INITIALIZING, 
null);
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .build();
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                scheduler.getArchivedExecutionGraph(JobStatus.INITIALIZING, 
null);
 
         
ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
     }
@@ -223,11 +270,14 @@ public class AdaptiveSchedulerTest {
                 new JobCheckpointingSettings(
                         CheckpointCoordinatorConfiguration.builder().build(), 
null));
 
-        final ArchivedExecutionGraph archivedExecutionGraph =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                        .build()
-                        .getArchivedExecutionGraph(JobStatus.INITIALIZING, 
null);
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .build();
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                scheduler.getArchivedExecutionGraph(JobStatus.INITIALIZING, 
null);
 
         ArchivedExecutionJobVertex jobVertex =
                 archivedExecutionGraph.getJobVertex(JOB_VERTEX.getID());
@@ -247,10 +297,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testIsState() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -262,10 +312,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testRunIfState() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -276,10 +326,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testRunIfStateWithStateMismatch() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -346,7 +396,7 @@ public class AdaptiveSchedulerTest {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -402,7 +452,7 @@ public class AdaptiveSchedulerTest {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
 
-        final AdaptiveScheduler adaptiveScheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -417,7 +467,7 @@ public class AdaptiveSchedulerTest {
 
         singleThreadMainThreadExecutor.execute(
                 () -> {
-                    adaptiveScheduler.startScheduling();
+                    scheduler.startScheduling();
                     offerSlots(
                             declarativeSlotPool,
                             createSlotOffersForResourceRequirements(
@@ -431,7 +481,7 @@ public class AdaptiveSchedulerTest {
 
         final ArchivedExecutionGraph executionGraph =
                 CompletableFuture.supplyAsync(
-                                () -> 
adaptiveScheduler.requestJob().getArchivedExecutionGraph(),
+                                () -> 
scheduler.requestJob().getArchivedExecutionGraph(),
                                 singleThreadMainThreadExecutor)
                         .join();
 
@@ -443,16 +493,16 @@ public class AdaptiveSchedulerTest {
     void testInitializationTimestampForwarding() throws Exception {
         final long expectedInitializationTimestamp = 42L;
 
-        final AdaptiveScheduler adaptiveScheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         
.setInitializationTimestamp(expectedInitializationTimestamp)
                         .build();
 
         final long initializationTimestamp =
-                adaptiveScheduler
+                scheduler
                         .requestJob()
                         .getArchivedExecutionGraph()
                         .getStatusTimestamp(JobStatus.INITIALIZING);
@@ -464,10 +514,10 @@ public class AdaptiveSchedulerTest {
     void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
         final TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .setFatalErrorHandler(fatalErrorHandler)
                         .build();
@@ -544,7 +594,7 @@ public class AdaptiveSchedulerTest {
         final Configuration configuration = 
createConfigurationWithNoTimeouts();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -564,33 +614,30 @@ public class AdaptiveSchedulerTest {
 
         taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
 
-        singleThreadMainThreadExecutor.execute(
-                () -> {
-                    scheduler.startScheduling();
-
-                    declarativeSlotPool.offerSlots(
-                            createSlotOffersForResourceRequirements(
-                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
-                            new LocalTaskManagerLocation(),
-                            taskManagerGateway,
-                            System.currentTimeMillis());
-                });
+        startTestInstanceInMainThread();
+        runInMainThread(
+                () ->
+                        declarativeSlotPool.offerSlots(
+                                createSlotOffersForResourceRequirements(
+                                        
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                                new LocalTaskManagerLocation(),
+                                taskManagerGateway,
+                                System.currentTimeMillis()));
 
         // wait for the first task submission
         taskManagerGateway.waitForSubmissions(1);
 
         assertThat(numRestartsMetric.getValue()).isEqualTo(0L);
 
-        singleThreadMainThreadExecutor.execute(
-                () -> {
-                    // offer more slots, which will cause a restart in order 
to scale up
-                    offerSlots(
-                            declarativeSlotPool,
-                            createSlotOffersForResourceRequirements(
-                                    ResourceCounter.withResource(
-                                            ResourceProfile.UNKNOWN, 
PARALLELISM)),
-                            taskManagerGateway);
-                });
+        // offer more slots, which will cause a restart in order to scale up
+        runInMainThread(
+                () ->
+                        offerSlots(
+                                declarativeSlotPool,
+                                createSlotOffersForResourceRequirements(
+                                        ResourceCounter.withResource(
+                                                ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                                taskManagerGateway));
 
         // wait for the second task submissions
         taskManagerGateway.waitForSubmissions(PARALLELISM);
@@ -634,7 +681,7 @@ public class AdaptiveSchedulerTest {
                 MetricOptions.JOB_STATUS_METRICS,
                 Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -656,16 +703,14 @@ public class AdaptiveSchedulerTest {
 
         taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
 
-        singleThreadMainThreadExecutor.execute(
-                () -> {
-                    scheduler.startScheduling();
-
-                    offerSlots(
-                            declarativeSlotPool,
-                            createSlotOffersForResourceRequirements(
-                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
-                            taskManagerGateway);
-                });
+        startTestInstanceInMainThread();
+        runInMainThread(
+                () ->
+                        offerSlots(
+                                declarativeSlotPool,
+                                createSlotOffersForResourceRequirements(
+                                        
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                                taskManagerGateway));
 
         // wait for the first task submission
         taskManagerGateway.waitForSubmissions(1);
@@ -674,15 +719,14 @@ public class AdaptiveSchedulerTest {
         assertThat(downTimeGauge.getValue()).isEqualTo(0L);
         assertThat(restartTimeGauge.getValue()).isEqualTo(0L);
 
-        singleThreadMainThreadExecutor.execute(
-                () -> {
-                    // offer more slots, which will cause a restart in order 
to scale up
-                    offerSlots(
-                            declarativeSlotPool,
-                            createSlotOffersForResourceRequirements(
-                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
-                            taskManagerGateway);
-                });
+        // offer more slots, which will cause a restart in order to scale up
+        runInMainThread(
+                () ->
+                        offerSlots(
+                                declarativeSlotPool,
+                                createSlotOffersForResourceRequirements(
+                                        
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                                taskManagerGateway));
 
         // wait for the second task submissions
         taskManagerGateway.waitForSubmissions(2);
@@ -699,14 +743,14 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testStartSchedulingTransitionsToWaitingForResources() throws 
Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
-        scheduler.startScheduling();
+        startTestInstanceInMainThread();
 
         
assertThat(scheduler.getState()).isInstanceOf(WaitingForResources.class);
     }
@@ -718,13 +762,15 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
-        scheduler.startScheduling();
+        startTestInstanceInMainThread();
 
         assertThat(declarativeSlotPool.getResourceRequirements())
                 .contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, 
PARALLELISM));
@@ -740,14 +786,16 @@ public class AdaptiveSchedulerTest {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .setJobMasterConfiguration(configuration)
                         .build();
 
-        scheduler.startScheduling();
+        startTestInstanceInMainThread();
 
         // should request the max possible resources
         final int expectedParallelism =
@@ -767,7 +815,7 @@ public class AdaptiveSchedulerTest {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -809,17 +857,17 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testGoToFinished() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
         final ArchivedExecutionGraph archivedExecutionGraph =
                 new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
 
-        scheduler.goToFinished(archivedExecutionGraph);
+        runInMainThread(() -> scheduler.goToFinished(archivedExecutionGraph));
 
         assertThat(scheduler.getState()).isInstanceOf(Finished.class);
     }
@@ -827,10 +875,10 @@ public class AdaptiveSchedulerTest {
     @Test
     void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {
         final AtomicInteger numStatusUpdates = new AtomicInteger();
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .setJobStatusListener(
                                 (jobId, newJobStatus, timestamp) ->
@@ -843,7 +891,8 @@ public class AdaptiveSchedulerTest {
                 .isEqualTo(JobStatus.INITIALIZING);
 
         // transition into next state, for which the job state is still 
INITIALIZING
-        scheduler.transitionToState(new 
DummyState.Factory(JobStatus.INITIALIZING));
+        runInMainThread(
+                () -> scheduler.transitionToState(new 
DummyState.Factory(JobStatus.INITIALIZING)));
 
         assertThat(numStatusUpdates).hasValue(0);
     }
@@ -862,7 +911,7 @@ public class AdaptiveSchedulerTest {
         final CompletableFuture<Void> jobFinishedNotification = new 
CompletableFuture<>();
         final CompletableFuture<JobStatus> unexpectedJobStatusNotification =
                 new CompletableFuture<>();
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -890,10 +939,9 @@ public class AdaptiveSchedulerTest {
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
 
-        singleThreadMainThreadExecutor.execute(
+        runInMainThread(
                 () -> {
                     scheduler.startScheduling();
-
                     offerSlots(
                             declarativeSlotPool,
                             createSlotOffersForResourceRequirements(
@@ -905,7 +953,7 @@ public class AdaptiveSchedulerTest {
         final TaskDeploymentDescriptor submittedTask = 
taskManagerGateway.submittedTasks.take();
 
         // let the job finish
-        singleThreadMainThreadExecutor.execute(
+        runInMainThread(
                 () ->
                         scheduler.updateTaskExecutionState(
                                 new TaskExecutionState(
@@ -939,7 +987,7 @@ public class AdaptiveSchedulerTest {
                 new JobCheckpointingSettings(
                         CheckpointCoordinatorConfiguration.builder().build(), 
null));
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -949,13 +997,17 @@ public class AdaptiveSchedulerTest {
                                         completedCheckpointStore, 
checkpointIdCounter))
                         .build();
 
-        singleThreadMainThreadExecutor.execute(
+        startTestInstanceInMainThread();
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        runInMainThread(
                 () -> {
-                    scheduler.startScheduling();
                     // transition into the FAILED state
                     scheduler.handleGlobalFailure(new FlinkException("Test 
exception"));
-                    scheduler.closeAsync();
+                    // we shouldn't block the closeAsync call here because it 
will trigger
+                    // additional task on the main thread internally
+                    FutureUtils.forward(scheduler.closeAsync(), closeFuture);
                 });
+        closeFuture.join();
 
         
assertThat(completedCheckpointStoreShutdownFuture.get()).isEqualTo(JobStatus.FAILED);
         
assertThat(checkpointIdCounterShutdownFuture.get()).isEqualTo(JobStatus.FAILED);
@@ -963,20 +1015,21 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testTransitionToStateCallsOnLeave() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
         final LifecycleMethodCapturingState firstState = new 
LifecycleMethodCapturingState();
 
-        scheduler.transitionToState(new StateInstanceFactory(firstState));
+        runInMainThread(() -> scheduler.transitionToState(new 
StateInstanceFactory(firstState)));
 
         firstState.reset();
 
-        scheduler.transitionToState(new DummyState.Factory());
+        runInMainThread(() -> scheduler.transitionToState(new 
DummyState.Factory()));
+
         assertThat(firstState.onLeaveCalled).isTrue();
         
assertThat(firstState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue();
     }
@@ -992,7 +1045,7 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -1060,8 +1113,7 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final AdaptiveScheduler scheduler =
-                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
+        scheduler = createSchedulerWithNoTimeouts(jobGraph, 
declarativeSlotPool);
 
         final int scaledUpParallelism = PARALLELISM * 2;
 
@@ -1096,8 +1148,7 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final AdaptiveScheduler scheduler =
-                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
+        scheduler = createSchedulerWithNoTimeouts(jobGraph, 
declarativeSlotPool);
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 createSubmissionBufferingTaskManagerGateway(PARALLELISM, 
scheduler);
@@ -1123,8 +1174,7 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final AdaptiveScheduler scheduler =
-                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
+        scheduler = createSchedulerWithNoTimeouts(jobGraph, 
declarativeSlotPool);
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 createSubmissionBufferingTaskManagerGateway(PARALLELISM, 
scheduler);
@@ -1160,8 +1210,7 @@ public class AdaptiveSchedulerTest {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
-        final AdaptiveScheduler scheduler =
-                createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
+        scheduler = createSchedulerWithNoTimeouts(jobGraph, 
declarativeSlotPool);
         int scaledUpParallelism = PARALLELISM * 10;
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
@@ -1230,7 +1279,7 @@ public class AdaptiveSchedulerTest {
         JobResourceRequirements initialJobResourceRequirements =
                 
createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM);
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool)
                         .withConfigurationOverride(
                                 conf -> {
@@ -1268,7 +1317,7 @@ public class AdaptiveSchedulerTest {
         JobResourceRequirements initialJobResourceRequirements =
                 
createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM);
 
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool)
                         
.setJobResourceRequirements(initialJobResourceRequirements)
                         .build();
@@ -1739,10 +1788,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testRepeatedTransitionIntoCurrentStateFails() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -1761,10 +1810,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testTriggerSavepointFailsInIllegalState() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -1777,10 +1826,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testStopWithSavepointFailsInIllegalState() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -1793,10 +1842,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws 
Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -1811,10 +1860,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() 
throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -1826,11 +1875,13 @@ public class AdaptiveSchedulerTest {
     }
 
     @Test
-    void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws 
Exception {
+    void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws 
Throwable {
         final JobGraph jobGraph = createJobGraph();
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
         assertThat(
@@ -1844,10 +1895,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testRequestNextInputSplitFailsInIllegalState() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -1860,10 +1911,10 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testRequestPartitionStateFailsInIllegalState() throws Exception {
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .build();
 
@@ -1883,16 +1934,16 @@ public class AdaptiveSchedulerTest {
                         .setTryReserveResourcesFunction(ignored -> 
Optional.empty())
                         .build();
 
-        final AdaptiveScheduler adaptiveScheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .setSlotAllocator(slotAllocator)
                         .build();
 
         final CreatingExecutionGraph.AssignmentResult assignmentResult =
-                adaptiveScheduler.tryToAssignSlots(
+                scheduler.tryToAssignSlots(
                         
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                                 new StateTrackingMockExecutionGraph(), 
JobSchedulingPlan.empty()));
 
@@ -1977,7 +2028,7 @@ public class AdaptiveSchedulerTest {
 
         final DeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID(), 
slotIdleTimeout);
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 jobGraph,
                                 singleThreadMainThreadExecutor,
@@ -1986,67 +2037,60 @@ public class AdaptiveSchedulerTest {
                         .setJobMasterConfiguration(configuration)
                         .build();
 
-        try {
-            final int numInitialSlots = 4;
-            final int numSlotsAfterDownscaling = 2;
+        final int numInitialSlots = 4;
+        final int numSlotsAfterDownscaling = 2;
 
-            final SubmissionBufferingTaskManagerGateway taskManagerGateway =
-                    new SubmissionBufferingTaskManagerGateway(numInitialSlots);
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numInitialSlots);
 
-            
taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
+        taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
 
-            singleThreadMainThreadExecutor.execute(
-                    () -> {
-                        scheduler.startScheduling();
-                        offerSlots(
-                                declarativeSlotPool,
-                                createSlotOffersForResourceRequirements(
-                                        ResourceCounter.withResource(
-                                                ResourceProfile.UNKNOWN, 
numInitialSlots)),
-                                taskManagerGateway);
-                    });
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
numInitialSlots)),
+                            taskManagerGateway);
+                });
 
-            // wait for all tasks to be submitted
-            taskManagerGateway.waitForSubmissions(numInitialSlots);
+        // wait for all tasks to be submitted
+        taskManagerGateway.waitForSubmissions(numInitialSlots);
 
-            // lower the resource requirements
-            singleThreadMainThreadExecutor.execute(
-                    () ->
-                            scheduler.updateJobResourceRequirements(
-                                    JobResourceRequirements.newBuilder()
-                                            .setParallelismForJobVertex(
-                                                    JOB_VERTEX.getID(), 1, 
numSlotsAfterDownscaling)
-                                            .build()));
+        // lower the resource requirements
+        singleThreadMainThreadExecutor.execute(
+                () ->
+                        scheduler.updateJobResourceRequirements(
+                                JobResourceRequirements.newBuilder()
+                                        .setParallelismForJobVertex(
+                                                JOB_VERTEX.getID(), 1, 
numSlotsAfterDownscaling)
+                                        .build()));
 
-            // job should be resubmitted with lower parallelism
-            taskManagerGateway.waitForSubmissions(numSlotsAfterDownscaling);
+        // job should be resubmitted with lower parallelism
+        taskManagerGateway.waitForSubmissions(numSlotsAfterDownscaling);
 
-            // and excessive slots should be freed
-            taskManagerGateway.waitForFreedSlots(numInitialSlots - 
numSlotsAfterDownscaling);
+        // and excessive slots should be freed
+        taskManagerGateway.waitForFreedSlots(numInitialSlots - 
numSlotsAfterDownscaling);
 
-            final CompletableFuture<JobStatus> jobStatusFuture = new 
CompletableFuture<>();
-            singleThreadMainThreadExecutor.execute(
-                    () -> 
jobStatusFuture.complete(scheduler.getState().getJobStatus()));
-            
assertThatFuture(jobStatusFuture).eventuallySucceeds().isEqualTo(JobStatus.RUNNING);
+        final CompletableFuture<JobStatus> jobStatusFuture = new 
CompletableFuture<>();
+        singleThreadMainThreadExecutor.execute(
+                () -> 
jobStatusFuture.complete(scheduler.getState().getJobStatus()));
+        
assertThatFuture(jobStatusFuture).eventuallySucceeds().isEqualTo(JobStatus.RUNNING);
 
-            // make sure we haven't freed up any more slots
-            assertThat(taskManagerGateway.freedSlots).isEmpty();
-        } finally {
-            final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
-            singleThreadMainThreadExecutor.execute(
-                    () -> FutureUtils.forward(scheduler.closeAsync(), 
closeFuture));
-            assertThatFuture(closeFuture).eventuallySucceeds();
-        }
+        // make sure we haven't freed up any more slots
+        assertThat(taskManagerGateway.freedSlots).isEmpty();
     }
 
     @Test
     void testUpdateResourceRequirementsInReactiveModeIsNotSupported() throws 
Exception {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
-                                mainThreadExecutor,
+                                singleThreadMainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         .setJobMasterConfiguration(configuration)
                         .build();
@@ -2061,9 +2105,11 @@ public class AdaptiveSchedulerTest {
     void testRequestDefaultResourceRequirements() throws Exception {
         final JobGraph jobGraph = createJobGraph();
         final Configuration configuration = new Configuration();
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
                         .setJobMasterConfiguration(configuration)
                         .build();
         assertThat(scheduler.requestJobResourceRequirements())
@@ -2079,9 +2125,11 @@ public class AdaptiveSchedulerTest {
         final JobGraph jobGraph = createJobGraph();
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
                         .setJobMasterConfiguration(configuration)
                         .build();
         assertThat(scheduler.requestJobResourceRequirements())
@@ -2098,9 +2146,11 @@ public class AdaptiveSchedulerTest {
     void testRequestUpdatedResourceRequirements() throws Exception {
         final JobGraph jobGraph = createJobGraph();
         final Configuration configuration = new Configuration();
-        final AdaptiveScheduler scheduler =
+        scheduler =
                 new AdaptiveSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
                         .setJobMasterConfiguration(configuration)
                         .build();
         final JobResourceRequirements newJobResourceRequirements =
@@ -2159,51 +2209,42 @@ public class AdaptiveSchedulerTest {
                 new CompletableFuture<>();
         final BlockingQueue<Integer> eventQueue = new ArrayBlockingQueue<>(1);
 
-        final AdaptiveScheduler testInstance =
+        scheduler =
                 createSchedulerThatReachesExecutingState(
                         PARALLELISM,
                         triggerOnFailedCheckpointCount,
                         eventQueue,
                         statsListenerInstantiatedFuture);
 
-        try {
-            // start scheduling to reach Executing state
-            
singleThreadMainThreadExecutor.execute(testInstance::startScheduling);
+        // start scheduling to reach Executing state
+        singleThreadMainThreadExecutor.execute(scheduler::startScheduling);
 
-            final CheckpointStatsListener statsListener = 
statsListenerInstantiatedFuture.get();
-            assertThat(statsListener)
-                    .as("The CheckpointStatsListener should have been 
instantiated.")
-                    .isNotNull();
+        final CheckpointStatsListener statsListener = 
statsListenerInstantiatedFuture.get();
+        assertThat(statsListener)
+                .as("The CheckpointStatsListener should have been 
instantiated.")
+                .isNotNull();
 
-            // the first trigger happens in the Executing initialization - 
let's wait for that event
-            // to pass
-            assertThat(eventQueue.take())
-                    .as(
-                            "The first event should have been appeared during 
Executing state initialization and should be ignored.")
-                    .isEqualTo(0);
-
-            // counting the failed checkpoints only starts on a change event
-            testInstance.updateJobResourceRequirements(
-                    JobResourceRequirements.newBuilder()
-                            .setParallelismForJobVertex(JOB_VERTEX.getID(), 1, 
PARALLELISM - 1)
-                            .build());
-
-            for (int i = 0; i < eventRepetitions; i++) {
-                assertThatNoException()
-                        .as(
-                                "Triggering the event from outside the main 
thread should not have caused an error.")
-                        .isThrownBy(() -> eventCallback.accept(statsListener));
-            }
+        // the first trigger happens in the Executing initialization - let's 
wait for that event
+        // to pass
+        assertThat(eventQueue.take())
+                .as(
+                        "The first event should have been appeared during 
Executing state initialization and should be ignored.")
+                .isEqualTo(0);
 
-            assertThat(eventQueue.take())
-                    .as("Only one event should have been observed.")
-                    .isEqualTo(1);
-        } finally {
-            final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
-            singleThreadMainThreadExecutor.execute(
-                    () -> FutureUtils.forward(testInstance.closeAsync(), 
closeFuture));
-            assertThatFuture(closeFuture).eventuallySucceeds();
+        // counting the failed checkpoints only starts on a change event
+        scheduler.updateJobResourceRequirements(
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(JOB_VERTEX.getID(), 1, 
PARALLELISM - 1)
+                        .build());
+
+        for (int i = 0; i < eventRepetitions; i++) {
+            assertThatNoException()
+                    .as(
+                            "Triggering the event from outside the main thread 
should not have caused an error.")
+                    .isThrownBy(() -> eventCallback.accept(statsListener));
         }
+
+        assertThat(eventQueue.take()).as("Only one event should have been 
observed.").isEqualTo(1);
     }
 
     // 
---------------------------------------------------------------------------------------------
@@ -2258,28 +2299,33 @@ public class AdaptiveSchedulerTest {
                         .build();
 
         final AtomicInteger eventCounter = new AtomicInteger();
-        return new AdaptiveSchedulerBuilder(
-                        jobGraph, singleThreadMainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                .setJobMasterConfiguration(config)
-                .setDeclarativeSlotPool(slotPool)
-                .setRescaleManagerFactory(
-                        new TestingRescaleManager.Factory(
-                                () -> {},
-                                () -> {
-                                    
singleThreadMainThreadExecutor.assertRunningInMainThread();
-
-                                    
eventQueue.offer(eventCounter.getAndIncrement());
-                                }))
-                .setCheckpointStatsTrackerFactory(
-                        (metricGroup, listener) -> {
-                            assertThat(statsListenerInstantiatedFuture)
-                                    .as(
-                                            "The CheckpointStatsListener 
should be only instantiated once.")
-                                    .isNotCompleted();
-                            statsListenerInstantiatedFuture.complete(listener);
-                            return NoOpCheckpointStatsTracker.INSTANCE;
-                        })
-                .build();
+        scheduler =
+                new AdaptiveSchedulerBuilder(
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .setJobMasterConfiguration(config)
+                        .setDeclarativeSlotPool(slotPool)
+                        .setRescaleManagerFactory(
+                                new TestingRescaleManager.Factory(
+                                        () -> {},
+                                        () -> {
+                                            singleThreadMainThreadExecutor
+                                                    
.assertRunningInMainThread();
+
+                                            
eventQueue.offer(eventCounter.getAndIncrement());
+                                        }))
+                        .setCheckpointStatsTrackerFactory(
+                                (metricGroup, listener) -> {
+                                    assertThat(statsListenerInstantiatedFuture)
+                                            .as(
+                                                    "The 
CheckpointStatsListener should be only instantiated once.")
+                                            .isNotCompleted();
+                                    
statsListenerInstantiatedFuture.complete(listener);
+                                    return NoOpCheckpointStatsTracker.INSTANCE;
+                                })
+                        .build();
+        return scheduler;
     }
 
     private CompletableFuture<ArchivedExecutionGraph> 
getArchivedExecutionGraphForRunningJob(
@@ -2551,57 +2597,61 @@ public class AdaptiveSchedulerTest {
             schedulerModifier.accept(builder);
             final AdaptiveScheduler scheduler = builder.build();
 
-            final SubmissionBufferingTaskManagerGateway taskManagerGateway =
-                    new SubmissionBufferingTaskManagerGateway(PARALLELISM);
-            taskManagerGateway.setCancelConsumer(
-                    attemptId ->
-                            mainThreadExecutor.execute(
-                                    () ->
-                                            scheduler.updateTaskExecutionState(
-                                                    new 
TaskExecutionStateTransition(
-                                                            new 
TaskExecutionState(
-                                                                    attemptId,
-                                                                    
ExecutionState.CANCELED,
-                                                                    null)))));
-
-            mainThreadExecutor.execute(
-                    () -> {
-                        scheduler.startScheduling();
-                        offerSlots(
-                                declarativeSlotPool,
-                                createSlotOffersForResourceRequirements(
-                                        ResourceCounter.withResource(
-                                                ResourceProfile.UNKNOWN, 
PARALLELISM)),
-                                taskManagerGateway);
-                    });
-            // wait for all tasks to be deployed this is important because 
some tests trigger
-            // savepoints these only properly work if the deployment has been 
started
-            taskManagerGateway.waitForSubmissions(PARALLELISM);
-
-            CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture =
-                    new CompletableFuture<>();
-            mainThreadExecutor.execute(
-                    () ->
-                            vertexFuture.complete(
-                                    scheduler
-                                            .requestJob()
-                                            .getArchivedExecutionGraph()
-                                            .getAllExecutionVertices()));
-            final Iterable<ArchivedExecutionVertex> executionVertices = 
vertexFuture.get();
-            final List<ExecutionAttemptID> attemptIds =
-                    IterableUtils.toStream(executionVertices)
-                            
.map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
-                            .map(ArchivedExecution::getAttemptId)
-                            .collect(Collectors.toList());
-            CompletableFuture<Void> runTestLogicFuture =
-                    CompletableFuture.runAsync(
-                            () -> testLogic.accept(scheduler, attemptIds), 
mainThreadExecutor);
-            runTestLogicFuture.get();
-
-            mainThreadExecutor.execute(scheduler::cancel);
-            scheduler.getJobTerminationFuture().get();
-
-            return scheduler.requestJob().getExceptionHistory();
+            try {
+                final SubmissionBufferingTaskManagerGateway taskManagerGateway 
=
+                        new SubmissionBufferingTaskManagerGateway(PARALLELISM);
+                taskManagerGateway.setCancelConsumer(
+                        attemptId ->
+                                mainThreadExecutor.execute(
+                                        () ->
+                                                
scheduler.updateTaskExecutionState(
+                                                        new 
TaskExecutionStateTransition(
+                                                                new 
TaskExecutionState(
+                                                                        
attemptId,
+                                                                        
ExecutionState.CANCELED,
+                                                                        
null)))));
+
+                mainThreadExecutor.execute(
+                        () -> {
+                            scheduler.startScheduling();
+                            offerSlots(
+                                    declarativeSlotPool,
+                                    createSlotOffersForResourceRequirements(
+                                            ResourceCounter.withResource(
+                                                    ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                                    taskManagerGateway);
+                        });
+                // wait for all tasks to be deployed this is important because 
some tests trigger
+                // savepoints these only properly work if the deployment has 
been started
+                taskManagerGateway.waitForSubmissions(PARALLELISM);
+
+                CompletableFuture<Iterable<ArchivedExecutionVertex>> 
vertexFuture =
+                        new CompletableFuture<>();
+                mainThreadExecutor.execute(
+                        () ->
+                                vertexFuture.complete(
+                                        scheduler
+                                                .requestJob()
+                                                .getArchivedExecutionGraph()
+                                                .getAllExecutionVertices()));
+                final Iterable<ArchivedExecutionVertex> executionVertices = 
vertexFuture.get();
+                final List<ExecutionAttemptID> attemptIds =
+                        IterableUtils.toStream(executionVertices)
+                                
.map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+                                .map(ArchivedExecution::getAttemptId)
+                                .collect(Collectors.toList());
+                CompletableFuture<Void> runTestLogicFuture =
+                        CompletableFuture.runAsync(
+                                () -> testLogic.accept(scheduler, attemptIds), 
mainThreadExecutor);
+                runTestLogicFuture.get();
+
+                mainThreadExecutor.execute(scheduler::cancel);
+                scheduler.getJobTerminationFuture().get();
+
+                return scheduler.requestJob().getExceptionHistory();
+            } finally {
+                AdaptiveSchedulerTest.closeInExecutorService(scheduler, 
mainThreadExecutor);
+            }
         }
     }
 

Reply via email to