This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7cff1da89cbd4f35b12c61a3e109db4a55682e44 Author: Matthias Pohl <[email protected]> AuthorDate: Thu Aug 1 11:51:02 2024 +0200 [FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to execute proper lifecycle management (i.e. closing the scheduler before shutting down the main thread executor) --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 622 +++++++++++---------- 1 file changed, 336 insertions(+), 286 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 70babe57449..ab0b96fa765 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -119,6 +119,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.jackson.JacksonMapperFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -188,12 +190,55 @@ public class AdaptiveSchedulerTest { private final ClassLoader classLoader = ClassLoader.getSystemClassLoader(); + private AdaptiveScheduler scheduler; + + @BeforeEach + void before() { + scheduler = null; + } + + @AfterEach + void after() { + closeInExecutorService(scheduler, singleThreadMainThreadExecutor); + } + + private static void closeInExecutorService( + @Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor executor) { + if (scheduler != null) { + final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + executor.execute( + () -> { + try { + // no matter what state the scheduler is in; we have to go to Finished + // state to please the Preconditions of the close call + if (scheduler.getState().getClass() != Finished.class) { + scheduler.goToFinished( + scheduler.getArchivedExecutionGraph( + JobStatus.CANCELED, null)); + } + FutureUtils.forward(scheduler.closeAsync(), closeFuture); + } catch (Throwable t) { + closeFuture.completeExceptionally(t); + } + }); + assertThatFuture(closeFuture).eventuallySucceeds(); + } + } + + private void startTestInstanceInMainThread() { + runInMainThread(() -> scheduler.startScheduling()); + } + + private void runInMainThread(Runnable callback) { + CompletableFuture.runAsync(callback, singleThreadMainThreadExecutor).join(); + } + @Test void testInitialState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -206,12 +251,14 @@ public class AdaptiveSchedulerTest { jobGraph.setSnapshotSettings( new JobCheckpointingSettings( CheckpointCoordinatorConfiguration.builder().build(), null)); - - final ArchivedExecutionGraph archivedExecutionGraph = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) - .build() - .getArchivedExecutionGraph(JobStatus.INITIALIZING, null); + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .build(); + final ArchivedExecutionGraph archivedExecutionGraph = + scheduler.getArchivedExecutionGraph(JobStatus.INITIALIZING, null); ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph); } @@ -223,11 +270,14 @@ public class AdaptiveSchedulerTest { new JobCheckpointingSettings( CheckpointCoordinatorConfiguration.builder().build(), null)); - final ArchivedExecutionGraph archivedExecutionGraph = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) - .build() - .getArchivedExecutionGraph(JobStatus.INITIALIZING, null); + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .build(); + final ArchivedExecutionGraph archivedExecutionGraph = + scheduler.getArchivedExecutionGraph(JobStatus.INITIALIZING, null); ArchivedExecutionJobVertex jobVertex = archivedExecutionGraph.getJobVertex(JOB_VERTEX.getID()); @@ -247,10 +297,10 @@ public class AdaptiveSchedulerTest { @Test void testIsState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -262,10 +312,10 @@ public class AdaptiveSchedulerTest { @Test void testRunIfState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -276,10 +326,10 @@ public class AdaptiveSchedulerTest { @Test void testRunIfStateWithStateMismatch() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -346,7 +396,7 @@ public class AdaptiveSchedulerTest { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -402,7 +452,7 @@ public class AdaptiveSchedulerTest { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); - final AdaptiveScheduler adaptiveScheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -417,7 +467,7 @@ public class AdaptiveSchedulerTest { singleThreadMainThreadExecutor.execute( () -> { - adaptiveScheduler.startScheduling(); + scheduler.startScheduling(); offerSlots( declarativeSlotPool, createSlotOffersForResourceRequirements( @@ -431,7 +481,7 @@ public class AdaptiveSchedulerTest { final ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync( - () -> adaptiveScheduler.requestJob().getArchivedExecutionGraph(), + () -> scheduler.requestJob().getArchivedExecutionGraph(), singleThreadMainThreadExecutor) .join(); @@ -443,16 +493,16 @@ public class AdaptiveSchedulerTest { void testInitializationTimestampForwarding() throws Exception { final long expectedInitializationTimestamp = 42L; - final AdaptiveScheduler adaptiveScheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setInitializationTimestamp(expectedInitializationTimestamp) .build(); final long initializationTimestamp = - adaptiveScheduler + scheduler .requestJob() .getArchivedExecutionGraph() .getStatusTimestamp(JobStatus.INITIALIZING); @@ -464,10 +514,10 @@ public class AdaptiveSchedulerTest { void testFatalErrorsForwardedToFatalErrorHandler() throws Exception { final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setFatalErrorHandler(fatalErrorHandler) .build(); @@ -544,7 +594,7 @@ public class AdaptiveSchedulerTest { final Configuration configuration = createConfigurationWithNoTimeouts(); configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -564,33 +614,30 @@ public class AdaptiveSchedulerTest { taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler)); - singleThreadMainThreadExecutor.execute( - () -> { - scheduler.startScheduling(); - - declarativeSlotPool.offerSlots( - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), - new LocalTaskManagerLocation(), - taskManagerGateway, - System.currentTimeMillis()); - }); + startTestInstanceInMainThread(); + runInMainThread( + () -> + declarativeSlotPool.offerSlots( + createSlotOffersForResourceRequirements( + ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), + new LocalTaskManagerLocation(), + taskManagerGateway, + System.currentTimeMillis())); // wait for the first task submission taskManagerGateway.waitForSubmissions(1); assertThat(numRestartsMetric.getValue()).isEqualTo(0L); - singleThreadMainThreadExecutor.execute( - () -> { - // offer more slots, which will cause a restart in order to scale up - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource( - ResourceProfile.UNKNOWN, PARALLELISM)), - taskManagerGateway); - }); + // offer more slots, which will cause a restart in order to scale up + runInMainThread( + () -> + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, PARALLELISM)), + taskManagerGateway)); // wait for the second task submissions taskManagerGateway.waitForSubmissions(PARALLELISM); @@ -634,7 +681,7 @@ public class AdaptiveSchedulerTest { MetricOptions.JOB_STATUS_METRICS, Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME)); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -656,16 +703,14 @@ public class AdaptiveSchedulerTest { taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler)); - singleThreadMainThreadExecutor.execute( - () -> { - scheduler.startScheduling(); - - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), - taskManagerGateway); - }); + startTestInstanceInMainThread(); + runInMainThread( + () -> + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), + taskManagerGateway)); // wait for the first task submission taskManagerGateway.waitForSubmissions(1); @@ -674,15 +719,14 @@ public class AdaptiveSchedulerTest { assertThat(downTimeGauge.getValue()).isEqualTo(0L); assertThat(restartTimeGauge.getValue()).isEqualTo(0L); - singleThreadMainThreadExecutor.execute( - () -> { - // offer more slots, which will cause a restart in order to scale up - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), - taskManagerGateway); - }); + // offer more slots, which will cause a restart in order to scale up + runInMainThread( + () -> + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), + taskManagerGateway)); // wait for the second task submissions taskManagerGateway.waitForSubmissions(2); @@ -699,14 +743,14 @@ public class AdaptiveSchedulerTest { @Test void testStartSchedulingTransitionsToWaitingForResources() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); - scheduler.startScheduling(); + startTestInstanceInMainThread(); assertThat(scheduler.getState()).isInstanceOf(WaitingForResources.class); } @@ -718,13 +762,15 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) .setDeclarativeSlotPool(declarativeSlotPool) .build(); - scheduler.startScheduling(); + startTestInstanceInMainThread(); assertThat(declarativeSlotPool.getResourceRequirements()) .contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, PARALLELISM)); @@ -740,14 +786,16 @@ public class AdaptiveSchedulerTest { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) .setDeclarativeSlotPool(declarativeSlotPool) .setJobMasterConfiguration(configuration) .build(); - scheduler.startScheduling(); + startTestInstanceInMainThread(); // should request the max possible resources final int expectedParallelism = @@ -767,7 +815,7 @@ public class AdaptiveSchedulerTest { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -809,17 +857,17 @@ public class AdaptiveSchedulerTest { @Test void testGoToFinished() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); final ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build(); - scheduler.goToFinished(archivedExecutionGraph); + runInMainThread(() -> scheduler.goToFinished(archivedExecutionGraph)); assertThat(scheduler.getState()).isInstanceOf(Finished.class); } @@ -827,10 +875,10 @@ public class AdaptiveSchedulerTest { @Test void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception { final AtomicInteger numStatusUpdates = new AtomicInteger(); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setJobStatusListener( (jobId, newJobStatus, timestamp) -> @@ -843,7 +891,8 @@ public class AdaptiveSchedulerTest { .isEqualTo(JobStatus.INITIALIZING); // transition into next state, for which the job state is still INITIALIZING - scheduler.transitionToState(new DummyState.Factory(JobStatus.INITIALIZING)); + runInMainThread( + () -> scheduler.transitionToState(new DummyState.Factory(JobStatus.INITIALIZING))); assertThat(numStatusUpdates).hasValue(0); } @@ -862,7 +911,7 @@ public class AdaptiveSchedulerTest { final CompletableFuture<Void> jobFinishedNotification = new CompletableFuture<>(); final CompletableFuture<JobStatus> unexpectedJobStatusNotification = new CompletableFuture<>(); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -890,10 +939,9 @@ public class AdaptiveSchedulerTest { final SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM); - singleThreadMainThreadExecutor.execute( + runInMainThread( () -> { scheduler.startScheduling(); - offerSlots( declarativeSlotPool, createSlotOffersForResourceRequirements( @@ -905,7 +953,7 @@ public class AdaptiveSchedulerTest { final TaskDeploymentDescriptor submittedTask = taskManagerGateway.submittedTasks.take(); // let the job finish - singleThreadMainThreadExecutor.execute( + runInMainThread( () -> scheduler.updateTaskExecutionState( new TaskExecutionState( @@ -939,7 +987,7 @@ public class AdaptiveSchedulerTest { new JobCheckpointingSettings( CheckpointCoordinatorConfiguration.builder().build(), null)); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -949,13 +997,17 @@ public class AdaptiveSchedulerTest { completedCheckpointStore, checkpointIdCounter)) .build(); - singleThreadMainThreadExecutor.execute( + startTestInstanceInMainThread(); + final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + runInMainThread( () -> { - scheduler.startScheduling(); // transition into the FAILED state scheduler.handleGlobalFailure(new FlinkException("Test exception")); - scheduler.closeAsync(); + // we shouldn't block the closeAsync call here because it will trigger + // additional task on the main thread internally + FutureUtils.forward(scheduler.closeAsync(), closeFuture); }); + closeFuture.join(); assertThat(completedCheckpointStoreShutdownFuture.get()).isEqualTo(JobStatus.FAILED); assertThat(checkpointIdCounterShutdownFuture.get()).isEqualTo(JobStatus.FAILED); @@ -963,20 +1015,21 @@ public class AdaptiveSchedulerTest { @Test void testTransitionToStateCallsOnLeave() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState(); - scheduler.transitionToState(new StateInstanceFactory(firstState)); + runInMainThread(() -> scheduler.transitionToState(new StateInstanceFactory(firstState))); firstState.reset(); - scheduler.transitionToState(new DummyState.Factory()); + runInMainThread(() -> scheduler.transitionToState(new DummyState.Factory())); + assertThat(firstState.onLeaveCalled).isTrue(); assertThat(firstState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue(); } @@ -992,7 +1045,7 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -1060,8 +1113,7 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final AdaptiveScheduler scheduler = - createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); + scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); final int scaledUpParallelism = PARALLELISM * 2; @@ -1096,8 +1148,7 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final AdaptiveScheduler scheduler = - createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); + scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); final SubmissionBufferingTaskManagerGateway taskManagerGateway = createSubmissionBufferingTaskManagerGateway(PARALLELISM, scheduler); @@ -1123,8 +1174,7 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final AdaptiveScheduler scheduler = - createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); + scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); final SubmissionBufferingTaskManagerGateway taskManagerGateway = createSubmissionBufferingTaskManagerGateway(PARALLELISM, scheduler); @@ -1160,8 +1210,7 @@ public class AdaptiveSchedulerTest { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); - final AdaptiveScheduler scheduler = - createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); + scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool); int scaledUpParallelism = PARALLELISM * 10; final SubmissionBufferingTaskManagerGateway taskManagerGateway = @@ -1230,7 +1279,7 @@ public class AdaptiveSchedulerTest { JobResourceRequirements initialJobResourceRequirements = createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM); - final AdaptiveScheduler scheduler = + scheduler = prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool) .withConfigurationOverride( conf -> { @@ -1268,7 +1317,7 @@ public class AdaptiveSchedulerTest { JobResourceRequirements initialJobResourceRequirements = createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM); - final AdaptiveScheduler scheduler = + scheduler = prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool) .setJobResourceRequirements(initialJobResourceRequirements) .build(); @@ -1739,10 +1788,10 @@ public class AdaptiveSchedulerTest { @Test void testRepeatedTransitionIntoCurrentStateFails() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -1761,10 +1810,10 @@ public class AdaptiveSchedulerTest { @Test void testTriggerSavepointFailsInIllegalState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -1777,10 +1826,10 @@ public class AdaptiveSchedulerTest { @Test void testStopWithSavepointFailsInIllegalState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -1793,10 +1842,10 @@ public class AdaptiveSchedulerTest { @Test void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -1811,10 +1860,10 @@ public class AdaptiveSchedulerTest { @Test void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -1826,11 +1875,13 @@ public class AdaptiveSchedulerTest { } @Test - void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception { + void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Throwable { final JobGraph jobGraph = createJobGraph(); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) .build(); assertThat( @@ -1844,10 +1895,10 @@ public class AdaptiveSchedulerTest { @Test void testRequestNextInputSplitFailsInIllegalState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -1860,10 +1911,10 @@ public class AdaptiveSchedulerTest { @Test void testRequestPartitionStateFailsInIllegalState() throws Exception { - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -1883,16 +1934,16 @@ public class AdaptiveSchedulerTest { .setTryReserveResourcesFunction(ignored -> Optional.empty()) .build(); - final AdaptiveScheduler adaptiveScheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setSlotAllocator(slotAllocator) .build(); final CreatingExecutionGraph.AssignmentResult assignmentResult = - adaptiveScheduler.tryToAssignSlots( + scheduler.tryToAssignSlots( CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( new StateTrackingMockExecutionGraph(), JobSchedulingPlan.empty())); @@ -1977,7 +2028,7 @@ public class AdaptiveSchedulerTest { final DeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID(), slotIdleTimeout); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( jobGraph, singleThreadMainThreadExecutor, @@ -1986,67 +2037,60 @@ public class AdaptiveSchedulerTest { .setJobMasterConfiguration(configuration) .build(); - try { - final int numInitialSlots = 4; - final int numSlotsAfterDownscaling = 2; + final int numInitialSlots = 4; + final int numSlotsAfterDownscaling = 2; - final SubmissionBufferingTaskManagerGateway taskManagerGateway = - new SubmissionBufferingTaskManagerGateway(numInitialSlots); + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(numInitialSlots); - taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler)); + taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler)); - singleThreadMainThreadExecutor.execute( - () -> { - scheduler.startScheduling(); - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource( - ResourceProfile.UNKNOWN, numInitialSlots)), - taskManagerGateway); - }); + singleThreadMainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, numInitialSlots)), + taskManagerGateway); + }); - // wait for all tasks to be submitted - taskManagerGateway.waitForSubmissions(numInitialSlots); + // wait for all tasks to be submitted + taskManagerGateway.waitForSubmissions(numInitialSlots); - // lower the resource requirements - singleThreadMainThreadExecutor.execute( - () -> - scheduler.updateJobResourceRequirements( - JobResourceRequirements.newBuilder() - .setParallelismForJobVertex( - JOB_VERTEX.getID(), 1, numSlotsAfterDownscaling) - .build())); + // lower the resource requirements + singleThreadMainThreadExecutor.execute( + () -> + scheduler.updateJobResourceRequirements( + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex( + JOB_VERTEX.getID(), 1, numSlotsAfterDownscaling) + .build())); - // job should be resubmitted with lower parallelism - taskManagerGateway.waitForSubmissions(numSlotsAfterDownscaling); + // job should be resubmitted with lower parallelism + taskManagerGateway.waitForSubmissions(numSlotsAfterDownscaling); - // and excessive slots should be freed - taskManagerGateway.waitForFreedSlots(numInitialSlots - numSlotsAfterDownscaling); + // and excessive slots should be freed + taskManagerGateway.waitForFreedSlots(numInitialSlots - numSlotsAfterDownscaling); - final CompletableFuture<JobStatus> jobStatusFuture = new CompletableFuture<>(); - singleThreadMainThreadExecutor.execute( - () -> jobStatusFuture.complete(scheduler.getState().getJobStatus())); - assertThatFuture(jobStatusFuture).eventuallySucceeds().isEqualTo(JobStatus.RUNNING); + final CompletableFuture<JobStatus> jobStatusFuture = new CompletableFuture<>(); + singleThreadMainThreadExecutor.execute( + () -> jobStatusFuture.complete(scheduler.getState().getJobStatus())); + assertThatFuture(jobStatusFuture).eventuallySucceeds().isEqualTo(JobStatus.RUNNING); - // make sure we haven't freed up any more slots - assertThat(taskManagerGateway.freedSlots).isEmpty(); - } finally { - final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); - singleThreadMainThreadExecutor.execute( - () -> FutureUtils.forward(scheduler.closeAsync(), closeFuture)); - assertThatFuture(closeFuture).eventuallySucceeds(); - } + // make sure we haven't freed up any more slots + assertThat(taskManagerGateway.freedSlots).isEmpty(); } @Test void testUpdateResourceRequirementsInReactiveModeIsNotSupported() throws Exception { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), - mainThreadExecutor, + singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setJobMasterConfiguration(configuration) .build(); @@ -2061,9 +2105,11 @@ public class AdaptiveSchedulerTest { void testRequestDefaultResourceRequirements() throws Exception { final JobGraph jobGraph = createJobGraph(); final Configuration configuration = new Configuration(); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) .setJobMasterConfiguration(configuration) .build(); assertThat(scheduler.requestJobResourceRequirements()) @@ -2079,9 +2125,11 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) .setJobMasterConfiguration(configuration) .build(); assertThat(scheduler.requestJobResourceRequirements()) @@ -2098,9 +2146,11 @@ public class AdaptiveSchedulerTest { void testRequestUpdatedResourceRequirements() throws Exception { final JobGraph jobGraph = createJobGraph(); final Configuration configuration = new Configuration(); - final AdaptiveScheduler scheduler = + scheduler = new AdaptiveSchedulerBuilder( - jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) .setJobMasterConfiguration(configuration) .build(); final JobResourceRequirements newJobResourceRequirements = @@ -2159,51 +2209,42 @@ public class AdaptiveSchedulerTest { new CompletableFuture<>(); final BlockingQueue<Integer> eventQueue = new ArrayBlockingQueue<>(1); - final AdaptiveScheduler testInstance = + scheduler = createSchedulerThatReachesExecutingState( PARALLELISM, triggerOnFailedCheckpointCount, eventQueue, statsListenerInstantiatedFuture); - try { - // start scheduling to reach Executing state - singleThreadMainThreadExecutor.execute(testInstance::startScheduling); + // start scheduling to reach Executing state + singleThreadMainThreadExecutor.execute(scheduler::startScheduling); - final CheckpointStatsListener statsListener = statsListenerInstantiatedFuture.get(); - assertThat(statsListener) - .as("The CheckpointStatsListener should have been instantiated.") - .isNotNull(); + final CheckpointStatsListener statsListener = statsListenerInstantiatedFuture.get(); + assertThat(statsListener) + .as("The CheckpointStatsListener should have been instantiated.") + .isNotNull(); - // the first trigger happens in the Executing initialization - let's wait for that event - // to pass - assertThat(eventQueue.take()) - .as( - "The first event should have been appeared during Executing state initialization and should be ignored.") - .isEqualTo(0); - - // counting the failed checkpoints only starts on a change event - testInstance.updateJobResourceRequirements( - JobResourceRequirements.newBuilder() - .setParallelismForJobVertex(JOB_VERTEX.getID(), 1, PARALLELISM - 1) - .build()); - - for (int i = 0; i < eventRepetitions; i++) { - assertThatNoException() - .as( - "Triggering the event from outside the main thread should not have caused an error.") - .isThrownBy(() -> eventCallback.accept(statsListener)); - } + // the first trigger happens in the Executing initialization - let's wait for that event + // to pass + assertThat(eventQueue.take()) + .as( + "The first event should have been appeared during Executing state initialization and should be ignored.") + .isEqualTo(0); - assertThat(eventQueue.take()) - .as("Only one event should have been observed.") - .isEqualTo(1); - } finally { - final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); - singleThreadMainThreadExecutor.execute( - () -> FutureUtils.forward(testInstance.closeAsync(), closeFuture)); - assertThatFuture(closeFuture).eventuallySucceeds(); + // counting the failed checkpoints only starts on a change event + scheduler.updateJobResourceRequirements( + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(JOB_VERTEX.getID(), 1, PARALLELISM - 1) + .build()); + + for (int i = 0; i < eventRepetitions; i++) { + assertThatNoException() + .as( + "Triggering the event from outside the main thread should not have caused an error.") + .isThrownBy(() -> eventCallback.accept(statsListener)); } + + assertThat(eventQueue.take()).as("Only one event should have been observed.").isEqualTo(1); } // --------------------------------------------------------------------------------------------- @@ -2258,28 +2299,33 @@ public class AdaptiveSchedulerTest { .build(); final AtomicInteger eventCounter = new AtomicInteger(); - return new AdaptiveSchedulerBuilder( - jobGraph, singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) - .setJobMasterConfiguration(config) - .setDeclarativeSlotPool(slotPool) - .setRescaleManagerFactory( - new TestingRescaleManager.Factory( - () -> {}, - () -> { - singleThreadMainThreadExecutor.assertRunningInMainThread(); - - eventQueue.offer(eventCounter.getAndIncrement()); - })) - .setCheckpointStatsTrackerFactory( - (metricGroup, listener) -> { - assertThat(statsListenerInstantiatedFuture) - .as( - "The CheckpointStatsListener should be only instantiated once.") - .isNotCompleted(); - statsListenerInstantiatedFuture.complete(listener); - return NoOpCheckpointStatsTracker.INSTANCE; - }) - .build(); + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setJobMasterConfiguration(config) + .setDeclarativeSlotPool(slotPool) + .setRescaleManagerFactory( + new TestingRescaleManager.Factory( + () -> {}, + () -> { + singleThreadMainThreadExecutor + .assertRunningInMainThread(); + + eventQueue.offer(eventCounter.getAndIncrement()); + })) + .setCheckpointStatsTrackerFactory( + (metricGroup, listener) -> { + assertThat(statsListenerInstantiatedFuture) + .as( + "The CheckpointStatsListener should be only instantiated once.") + .isNotCompleted(); + statsListenerInstantiatedFuture.complete(listener); + return NoOpCheckpointStatsTracker.INSTANCE; + }) + .build(); + return scheduler; } private CompletableFuture<ArchivedExecutionGraph> getArchivedExecutionGraphForRunningJob( @@ -2551,57 +2597,61 @@ public class AdaptiveSchedulerTest { schedulerModifier.accept(builder); final AdaptiveScheduler scheduler = builder.build(); - final SubmissionBufferingTaskManagerGateway taskManagerGateway = - new SubmissionBufferingTaskManagerGateway(PARALLELISM); - taskManagerGateway.setCancelConsumer( - attemptId -> - mainThreadExecutor.execute( - () -> - scheduler.updateTaskExecutionState( - new TaskExecutionStateTransition( - new TaskExecutionState( - attemptId, - ExecutionState.CANCELED, - null))))); - - mainThreadExecutor.execute( - () -> { - scheduler.startScheduling(); - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource( - ResourceProfile.UNKNOWN, PARALLELISM)), - taskManagerGateway); - }); - // wait for all tasks to be deployed this is important because some tests trigger - // savepoints these only properly work if the deployment has been started - taskManagerGateway.waitForSubmissions(PARALLELISM); - - CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture = - new CompletableFuture<>(); - mainThreadExecutor.execute( - () -> - vertexFuture.complete( - scheduler - .requestJob() - .getArchivedExecutionGraph() - .getAllExecutionVertices())); - final Iterable<ArchivedExecutionVertex> executionVertices = vertexFuture.get(); - final List<ExecutionAttemptID> attemptIds = - IterableUtils.toStream(executionVertices) - .map(ArchivedExecutionVertex::getCurrentExecutionAttempt) - .map(ArchivedExecution::getAttemptId) - .collect(Collectors.toList()); - CompletableFuture<Void> runTestLogicFuture = - CompletableFuture.runAsync( - () -> testLogic.accept(scheduler, attemptIds), mainThreadExecutor); - runTestLogicFuture.get(); - - mainThreadExecutor.execute(scheduler::cancel); - scheduler.getJobTerminationFuture().get(); - - return scheduler.requestJob().getExceptionHistory(); + try { + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(PARALLELISM); + taskManagerGateway.setCancelConsumer( + attemptId -> + mainThreadExecutor.execute( + () -> + scheduler.updateTaskExecutionState( + new TaskExecutionStateTransition( + new TaskExecutionState( + attemptId, + ExecutionState.CANCELED, + null))))); + + mainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, PARALLELISM)), + taskManagerGateway); + }); + // wait for all tasks to be deployed this is important because some tests trigger + // savepoints these only properly work if the deployment has been started + taskManagerGateway.waitForSubmissions(PARALLELISM); + + CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture = + new CompletableFuture<>(); + mainThreadExecutor.execute( + () -> + vertexFuture.complete( + scheduler + .requestJob() + .getArchivedExecutionGraph() + .getAllExecutionVertices())); + final Iterable<ArchivedExecutionVertex> executionVertices = vertexFuture.get(); + final List<ExecutionAttemptID> attemptIds = + IterableUtils.toStream(executionVertices) + .map(ArchivedExecutionVertex::getCurrentExecutionAttempt) + .map(ArchivedExecution::getAttemptId) + .collect(Collectors.toList()); + CompletableFuture<Void> runTestLogicFuture = + CompletableFuture.runAsync( + () -> testLogic.accept(scheduler, attemptIds), mainThreadExecutor); + runTestLogicFuture.get(); + + mainThreadExecutor.execute(scheduler::cancel); + scheduler.getJobTerminationFuture().get(); + + return scheduler.requestJob().getExceptionHistory(); + } finally { + AdaptiveSchedulerTest.closeInExecutorService(scheduler, mainThreadExecutor); + } } }
