This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b67f0a70662819e064176a418ce2b892f3fb61b4 Author: Lijie Wang <[email protected]> AuthorDate: Mon Jul 4 14:04:34 2022 +0800 [hotfix][runtime][tests] Migrate DeclarativeSlotPoolServiceTest and JobMasterTest to JUnit5 --- .../flink/runtime/jobmaster/JobMasterTest.java | 293 +++++++++------------ .../slotpool/DeclarativeSlotPoolServiceTest.java | 85 +++--- 2 files changed, 174 insertions(+), 204 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index cd4c32bf936..2a1b4eaa7a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -108,29 +108,26 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; -import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; @@ -159,24 +156,15 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link JobMaster}. */ -public class JobMasterTest extends TestLogger { +class JobMasterTest { private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0]; - @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir private Path temporaryFolder; private static final Time testingTimeout = Time.seconds(10L); @@ -206,8 +194,8 @@ public class JobMasterTest extends TestLogger { private TestingFatalErrorHandler testingFatalErrorHandler; - @BeforeClass - public static void setupClass() { + @BeforeAll + static void setupAll() { rpcService = new TestingRpcService(); fastHeartbeatServices = @@ -215,8 +203,8 @@ public class JobMasterTest extends TestLogger { heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout, 1); } - @Before - public void setup() throws IOException { + @BeforeEach + void setup() throws IOException { configuration = new Configuration(); haServices = new TestingHighAvailabilityServices(); jobMasterId = JobMasterId.generate(); @@ -230,11 +218,13 @@ public class JobMasterTest extends TestLogger { haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); configuration.setString( - BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobServerOptions.STORAGE_DIRECTORY, + Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()) + .toString()); } - @After - public void teardown() throws Exception { + @AfterEach + void teardown() throws Exception { if (testingFatalErrorHandler != null) { testingFatalErrorHandler.rethrowError(); } @@ -242,8 +232,8 @@ public class JobMasterTest extends TestLogger { rpcService.clearGateways(); } - @AfterClass - public static void teardownClass() { + @AfterAll + static void teardownAll() { if (rpcService != null) { rpcService.stopService(); rpcService = null; @@ -251,7 +241,7 @@ public class JobMasterTest extends TestLogger { } @Test - public void testTaskManagerRegistrationTriggersHeartbeating() throws Exception { + void testTaskManagerRegistrationTriggersHeartbeating() throws Exception { final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>(); final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation(); @@ -293,12 +283,15 @@ public class JobMasterTest extends TestLogger { // wait for the completion of the registration registrationResponse.get(); - assertThat(heartbeatResourceIdFuture.join(), anyOf(nullValue(), equalTo(jmResourceId))); + assertThat(heartbeatResourceIdFuture.join()) + .satisfiesAnyOf( + resourceID -> assertThat(resourceID).isNull(), + resourceID -> assertThat(resourceID).isEqualTo(jmResourceId)); } } @Test - public void testHeartbeatTimeoutWithTaskManager() throws Exception { + void testHeartbeatTimeoutWithTaskManager() throws Exception { runHeartbeatTest( new TestingTaskExecutorGatewayBuilder() .setHeartbeatJobManagerFunction( @@ -352,12 +345,12 @@ public class JobMasterTest extends TestLogger { disconnectedJobManagerFuture.get( testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - assertThat(disconnectedJobManager, equalTo(jobGraph.getJobID())); + assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID()); } } @Test - public void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception { + void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception { runHeartbeatTest( new TestingTaskExecutorGatewayBuilder() .setHeartbeatJobManagerFunction( @@ -378,7 +371,7 @@ public class JobMasterTest extends TestLogger { * for FLINK-12863. */ @Test - public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception { + void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception { final CompletableFuture<Void> assertionFuture = new CompletableFuture<>(); final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation(); @@ -390,13 +383,11 @@ public class JobMasterTest extends TestLogger { (taskManagerId, allocatedSlotReport) -> { try { if (hasReceivedSlotOffers.isTriggered()) { - assertThat( - allocatedSlotReport.getAllocatedSlotInfos(), - hasSize(1)); + assertThat(allocatedSlotReport.getAllocatedSlotInfos()) + .hasSize(1); } else { - assertThat( - allocatedSlotReport.getAllocatedSlotInfos(), - empty()); + assertThat(allocatedSlotReport.getAllocatedSlotInfos()) + .isEmpty(); } } catch (AssertionError e) { assertionFuture.completeExceptionally(e); @@ -448,7 +439,7 @@ public class JobMasterTest extends TestLogger { Collections.singleton(slotOffer), testingTimeout); - assertThat(slotOfferFuture.get(), containsInAnyOrder(slotOffer)); + assertThat(slotOfferFuture.get()).containsExactly(slotOffer); terminateHeartbeatVerification.set(true); @@ -636,7 +627,7 @@ public class JobMasterTest extends TestLogger { } @Test - public void testHeartbeatTimeoutWithResourceManager() throws Exception { + void testHeartbeatTimeoutWithResourceManager() throws Exception { final String resourceManagerAddress = "rm"; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); @@ -685,16 +676,16 @@ public class JobMasterTest extends TestLogger { jobManagerRegistrationFuture.get( testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - assertThat(registrationInformation.f0, Matchers.equalTo(jobMasterId)); - assertThat(registrationInformation.f1, Matchers.equalTo(jmResourceId)); - assertThat(registrationInformation.f2, Matchers.equalTo(jobGraph.getJobID())); + assertThat(registrationInformation.f0).isEqualTo(jobMasterId); + assertThat(registrationInformation.f1).isEqualTo(jmResourceId); + assertThat(registrationInformation.f2).isEqualTo(jobGraph.getJobID()); final JobID disconnectedJobManager = disconnectedJobManagerFuture.get( testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); // heartbeat timeout should trigger disconnect JobManager from ResourceManager - assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID())); + assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID()); // the JobMaster should try to reconnect to the RM registrationAttempts.await(); @@ -702,7 +693,7 @@ public class JobMasterTest extends TestLogger { } @Test - public void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception { + void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception { final String resourceManagerAddress = "rm"; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); @@ -763,7 +754,7 @@ public class JobMasterTest extends TestLogger { 50L); // heartbeat timeout should trigger disconnect JobManager from ResourceManager - assertThat(disconnectedJobManagerFuture.join(), equalTo(jobGraph.getJobID())); + assertThat(disconnectedJobManagerFuture.join()).isEqualTo(jobGraph.getJobID()); // the JobMaster should try to reconnect to the RM registrationAttempts.await(); @@ -775,7 +766,7 @@ public class JobMasterTest extends TestLogger { * submission. */ @Test - public void testRestoringFromSavepoint() throws Exception { + void testRestoringFromSavepoint() throws Exception { // create savepoint data final long savepointId = 42L; @@ -823,15 +814,15 @@ public class JobMasterTest extends TestLogger { final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); - assertThat(savepointCheckpoint, Matchers.notNullValue()); + assertThat(savepointCheckpoint).isNotNull(); - assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId)); + assertThat(savepointCheckpoint.getCheckpointID()).isEqualTo(savepointId); } } /** Tests that an existing checkpoint will have precedence over an savepoint. */ @Test - public void testCheckpointPrecedesSavepointRecovery() throws Exception { + void testCheckpointPrecedesSavepointRecovery() throws Exception { // create savepoint data final long savepointId = 42L; @@ -873,15 +864,15 @@ public class JobMasterTest extends TestLogger { final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); - assertThat(savepointCheckpoint, Matchers.notNullValue()); + assertThat(savepointCheckpoint).isNotNull(); - assertThat(savepointCheckpoint.getCheckpointID(), is(checkpointId)); + assertThat(savepointCheckpoint.getCheckpointID()).isEqualTo(checkpointId); } } /** Tests that we can close an unestablished ResourceManager connection. */ @Test - public void testCloseUnestablishedResourceManagerConnection() throws Exception { + void testCloseUnestablishedResourceManagerConnection() throws Exception { try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withConfiguration(configuration) @@ -927,7 +918,7 @@ public class JobMasterTest extends TestLogger { /** Tests that we continue reconnecting to the latest known RM after a disconnection message. */ @Test - public void testReconnectionAfterDisconnect() throws Exception { + void testReconnectionAfterDisconnect() throws Exception { try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withJobMasterId(jobMasterId) @@ -959,20 +950,20 @@ public class JobMasterTest extends TestLogger { // wait for first registration attempt final JobMasterId firstRegistrationAttempt = registrationsQueue.take(); - assertThat(firstRegistrationAttempt, equalTo(jobMasterId)); + assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId); - assertThat(registrationsQueue.isEmpty(), is(true)); + assertThat(registrationsQueue).isEmpty(); jobMasterGateway.disconnectResourceManager( resourceManagerId, new FlinkException("Test exception")); // wait for the second registration attempt after the disconnect call - assertThat(registrationsQueue.take(), equalTo(jobMasterId)); + assertThat(registrationsQueue.take()).isEqualTo(jobMasterId); } } /** Tests that the a JM connects to the leading RM after regaining leadership. */ @Test - public void testResourceManagerConnectionAfterStart() throws Exception { + void testResourceManagerConnectionAfterStart() throws Exception { try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withJobMasterId(jobMasterId) @@ -998,7 +989,7 @@ public class JobMasterTest extends TestLogger { final JobMasterId firstRegistrationAttempt = registrationQueue.take(); - assertThat(firstRegistrationAttempt, equalTo(jobMasterId)); + assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId); } } @@ -1007,8 +998,8 @@ public class JobMasterTest extends TestLogger { * if this execution fails. */ @Test - @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450 - public void testRequestNextInputSplitWithLocalFailover() throws Exception { + @Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // FLINK-21450 + void testRequestNextInputSplitWithLocalFailover() throws Exception { configuration.setString( JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, @@ -1021,7 +1012,7 @@ public class JobMasterTest extends TestLogger { } @Test - public void testRequestNextInputSplitWithGlobalFailover() throws Exception { + void testRequestNextInputSplitWithGlobalFailover() throws Exception { configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); configuration.set( RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(0)); @@ -1096,9 +1087,8 @@ public class JobMasterTest extends TestLogger { } final List<InputSplit> allRequestedInputSplits = flattenCollection(inputSplitsPerTask); - assertThat( - allRequestedInputSplits, - containsInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS))); + assertThat(allRequestedInputSplits) + .containsExactlyInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS)); // fail the first execution to trigger a failover jobMasterGateway @@ -1116,12 +1106,11 @@ public class JobMasterTest extends TestLogger { getRemainingInputSplits( getInputSplitSupplier(sourceId, jobMasterGateway, restartedAttemptId)); - assertThat( - inputSplits, - containsInAnyOrder( + assertThat(inputSplits) + .containsExactlyInAnyOrder( expectedRemainingInputSplits .apply(inputSplitsPerTask) - .toArray(EMPTY_TESTING_INPUT_SPLITS))); + .toArray(EMPTY_TESTING_INPUT_SPLITS)); } } @@ -1158,7 +1147,7 @@ public class JobMasterTest extends TestLogger { final JobMasterGateway jobMasterGateway, final JobVertexID jobVertexId) { final List<AccessExecution> executions = getExecutions(jobMasterGateway, jobVertexId); - assertThat(executions, hasSize(greaterThanOrEqualTo(1))); + assertThat(executions.size()).isGreaterThanOrEqualTo(1); return executions.get(0); } @@ -1204,7 +1193,7 @@ public class JobMasterTest extends TestLogger { for (int i = 0; i < numberInputSplits; i++) { final SerializedInputSplit serializedInputSplit = nextInputSplit.get(); - assertThat(serializedInputSplit.isEmpty(), is(false)); + assertThat(serializedInputSplit.isEmpty()).isFalse(); actualInputSplits.add( InstantiationUtil.deserializeObject( @@ -1313,7 +1302,7 @@ public class JobMasterTest extends TestLogger { * call for a finished result partition. */ @Test - public void testRequestPartitionState() throws Exception { + void testRequestPartitionState() throws Exception { final JobGraph producerConsumerJobGraph = producerConsumerJobGraph(); try (final JobMaster jobMaster = new JobMasterBuilder(producerConsumerJobGraph, rpcService) @@ -1347,12 +1336,12 @@ public class JobMasterTest extends TestLogger { testingTaskExecutorGateway, taskManagerLocation); - assertThat(slotOffers, hasSize(1)); + assertThat(slotOffers).hasSize(1); // obtain tdd for the result partition ids final TaskDeploymentDescriptor tdd = tddFuture.get(); - assertThat(tdd.getProducedPartitions(), hasSize(1)); + assertThat(tdd.getProducedPartitions()).hasSize(1); final ResultPartitionDeploymentDescriptor partition = tdd.getProducedPartitions().iterator().next(); @@ -1372,35 +1361,23 @@ public class JobMasterTest extends TestLogger { CompletableFuture<ExecutionState> partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), partitionId); - assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED)); + assertThat(partitionStateFuture.get()).isEqualTo(ExecutionState.FINISHED); // ask for unknown result partition partitionStateFuture = jobMasterGateway.requestPartitionState( partition.getResultId(), new ResultPartitionID()); - try { - partitionStateFuture.get(); - fail("Expected failure."); - } catch (ExecutionException e) { - assertThat( - ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(), - is(true)); - } + assertThatThrownBy(partitionStateFuture::get) + .hasRootCauseInstanceOf(IllegalArgumentException.class); // ask for wrong intermediate data set id partitionStateFuture = jobMasterGateway.requestPartitionState( new IntermediateDataSetID(), partitionId); - try { - partitionStateFuture.get(); - fail("Expected failure."); - } catch (ExecutionException e) { - assertThat( - ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(), - is(true)); - } + assertThatThrownBy(partitionStateFuture::get) + .hasRootCauseInstanceOf(IllegalArgumentException.class); // ask for "old" execution partitionStateFuture = @@ -1409,15 +1386,8 @@ public class JobMasterTest extends TestLogger { new ResultPartitionID( partition.getPartitionId(), createExecutionAttemptId())); - try { - partitionStateFuture.get(); - fail("Expected failure."); - } catch (ExecutionException e) { - assertThat( - ExceptionUtils.findThrowable(e, PartitionProducerDisposedException.class) - .isPresent(), - is(true)); - } + assertThatThrownBy(partitionStateFuture::get) + .hasRootCauseInstanceOf(PartitionProducerDisposedException.class); } } @@ -1433,7 +1403,7 @@ public class JobMasterTest extends TestLogger { * SavepointFormatType, Time)} is respected. */ @Test - public void testTriggerSavepointTimeout() throws Exception { + void testTriggerSavepointTimeout() throws Exception { final TestingSchedulerNG testingSchedulerNG = TestingSchedulerNG.newBuilder() .setTriggerSavepointFunction( @@ -1460,21 +1430,19 @@ public class JobMasterTest extends TestLogger { jobMasterGateway.triggerSavepoint( "/tmp", false, SavepointFormatType.CANONICAL, RpcUtils.INF_TIMEOUT); - try { - savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit()); - fail(); - } catch (final ExecutionException e) { - final Throwable cause = ExceptionUtils.stripExecutionException(e); - assertThat(cause, instanceOf(TimeoutException.class)); - } + assertThatThrownBy( + () -> + savepointFutureLowTimeout.get( + testingTimeout.getSize(), testingTimeout.getUnit())) + .hasRootCauseInstanceOf(TimeoutException.class); - assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false))); + assertThat(savepointFutureHighTimeout).isNotDone(); } } /** Tests that the TaskExecutor is released if all of its slots have been freed. */ @Test - public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception { + void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception { final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy(); @@ -1515,7 +1483,7 @@ public class JobMasterTest extends TestLogger { taskManagerLocation); // check that we accepted the offered slot - assertThat(slotOffers, hasSize(1)); + assertThat(slotOffers).hasSize(1); final AllocationID allocationId = slotOffers.iterator().next().getAllocationId(); // now fail the allocation and check that we close the connection to the TaskExecutor @@ -1526,14 +1494,13 @@ public class JobMasterTest extends TestLogger { // we should free the slot and then disconnect from the TaskExecutor because we use no // longer slots from it - assertThat(freedSlotFuture.get(), equalTo(allocationId)); - assertThat(disconnectTaskExecutorFuture.get(), equalTo(jobGraph.getJobID())); + assertThat(freedSlotFuture.get()).isEqualTo(allocationId); + assertThat(disconnectTaskExecutorFuture.get()).isEqualTo(jobGraph.getJobID()); } } @Test - public void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() - throws Exception { + void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception { final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); @@ -1583,7 +1550,7 @@ public class JobMasterTest extends TestLogger { taskManagerUnresolvedLocation); // check that we accepted the offered slot - assertThat(slotOffers, hasSize(1)); + assertThat(slotOffers).hasSize(1); final AllocationID allocationId = slotOffers.iterator().next().getAllocationId(); jobMasterGateway.failSlot( @@ -1593,18 +1560,18 @@ public class JobMasterTest extends TestLogger { // we should free the slot, but not disconnect from the TaskExecutor as we still have an // allocated partition - assertThat(freedSlotFuture.get(), equalTo(allocationId)); + assertThat(freedSlotFuture.get()).isEqualTo(allocationId); // trigger some request to guarantee ensure the slotAllocationFailure processing if // complete jobMasterGateway.requestJobStatus(Time.seconds(5)).get(); - assertThat(disconnectTaskExecutorFuture.isDone(), is(false)); + assertThat(disconnectTaskExecutorFuture).isNotDone(); } } /** Tests the updateGlobalAggregate functionality. */ @Test - public void testJobMasterAggregatesValuesCorrectly() throws Exception { + void testJobMasterAggregatesValuesCorrectly() throws Exception { try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withConfiguration(configuration) @@ -1628,27 +1595,27 @@ public class JobMasterTest extends TestLogger { updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction); - assertThat(updateAggregateFuture.get(), equalTo(1)); + assertThat(updateAggregateFuture.get()).isEqualTo(1); updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 2, serializedAggregateFunction); - assertThat(updateAggregateFuture.get(), equalTo(3)); + assertThat(updateAggregateFuture.get()).isEqualTo(3); updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 3, serializedAggregateFunction); - assertThat(updateAggregateFuture.get(), equalTo(6)); + assertThat(updateAggregateFuture.get()).isEqualTo(6); updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 4, serializedAggregateFunction); - assertThat(updateAggregateFuture.get(), equalTo(10)); + assertThat(updateAggregateFuture.get()).isEqualTo(10); updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", 10, serializedAggregateFunction); - assertThat(updateAggregateFuture.get(), equalTo(10)); + assertThat(updateAggregateFuture.get()).isEqualTo(10); updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", 23, serializedAggregateFunction); - assertThat(updateAggregateFuture.get(), equalTo(33)); + assertThat(updateAggregateFuture.get()).isEqualTo(33); } } @@ -1690,7 +1657,7 @@ public class JobMasterTest extends TestLogger { * Tests that the job execution is failed if the TaskExecutor disconnects from the JobMaster. */ @Test - public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { + void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { runJobFailureWhenTaskExecutorTerminatesTest( heartbeatServices, (localTaskManagerLocation, jobMasterGateway) -> @@ -1700,7 +1667,7 @@ public class JobMasterTest extends TestLogger { } @Test - public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { + void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { final TestingHeartbeatServices testingHeartbeatService = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout); @@ -1716,7 +1683,7 @@ public class JobMasterTest extends TestLogger { * actual JobID are not equal. See FLINK-21606. */ @Test - public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception { + void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception { try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) { @@ -1731,12 +1698,12 @@ public class JobMasterTest extends TestLogger { TestingUtils.zeroUUID()), testingTimeout); - assertThat(registrationResponse.get(), instanceOf(JMTMRegistrationRejection.class)); + assertThat(registrationResponse.get()).isInstanceOf(JMTMRegistrationRejection.class); } } @Test - public void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception { + void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception { try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) { @@ -1764,13 +1731,14 @@ public class JobMasterTest extends TestLogger { taskManagerRegistrationInformation, testingTimeout); - assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); - assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); + assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class); + assertThat(secondRegistrationResponse.get()) + .isInstanceOf(JMTMRegistrationSuccess.class); } } @Test - public void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception { + void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception { try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) { @@ -1807,7 +1775,7 @@ public class JobMasterTest extends TestLogger { taskManagerLocation, firstTaskManagerSessionId), testingTimeout); - assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); + assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class); final UUID secondTaskManagerSessionId = UUID.randomUUID(); final CompletableFuture<RegistrationResponse> secondRegistrationResponse = @@ -1819,14 +1787,15 @@ public class JobMasterTest extends TestLogger { secondTaskManagerSessionId), testingTimeout); - assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class)); + assertThat(secondRegistrationResponse.get()) + .isInstanceOf(JMTMRegistrationSuccess.class); // the first TaskExecutor should be disconnected firstTaskExecutorDisconnectedFuture.get(); } } @Test - public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception { + void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception { final CompletableFuture<Void> schedulerTerminationFuture = new CompletableFuture<>(); final TestingSchedulerNG testingSchedulerNG = TestingSchedulerNG.newBuilder() @@ -1845,11 +1814,9 @@ public class JobMasterTest extends TestLogger { final CompletableFuture<Void> jobMasterTerminationFuture = jobMaster.closeAsync(); - try { - jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS); - fail("Expected TimeoutException because the JobMaster should not terminate."); - } catch (TimeoutException expected) { - } + assertThatThrownBy(() -> jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS)) + .as("Expected TimeoutException because the JobMaster should not terminate.") + .isInstanceOf(TimeoutException.class); schedulerTerminationFuture.complete(null); @@ -1858,7 +1825,7 @@ public class JobMasterTest extends TestLogger { } @Test - public void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception { + void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception { configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); configuration.set( RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofDays(1)); @@ -1899,15 +1866,15 @@ public class JobMasterTest extends TestLogger { == JobStatus.RESTARTING); assertThat( - registerSlotsAtJobMaster( - numberSlots, - jobMasterGateway, - jobGraph.getJobID(), - new TestingTaskExecutorGatewayBuilder() - .setAddress("secondTaskManager") - .createTestingTaskExecutorGateway(), - new LocalUnresolvedTaskManagerLocation()), - hasSize(numberSlots)); + registerSlotsAtJobMaster( + numberSlots, + jobMasterGateway, + jobGraph.getJobID(), + new TestingTaskExecutorGatewayBuilder() + .setAddress("secondTaskManager") + .createTestingTaskExecutorGateway(), + new LocalUnresolvedTaskManagerLocation())) + .hasSize(numberSlots); } } @@ -1953,7 +1920,7 @@ public class JobMasterTest extends TestLogger { jobGraph.getJobID(), taskExecutorGateway, taskManagerUnresolvedLocation); - assertThat(slotOffers, hasSize(1)); + assertThat(slotOffers).hasSize(1); final ExecutionAttemptID executionAttemptId = taskDeploymentFuture.get(); @@ -1974,7 +1941,7 @@ public class JobMasterTest extends TestLogger { .get() .getArchivedExecutionGraph(); - assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); } } @@ -2024,7 +1991,9 @@ public class JobMasterTest extends TestLogger { } private File createSavepoint(long savepointId) throws IOException { - return TestUtils.createSavepointWithOperatorState(temporaryFolder.newFile(), savepointId); + return TestUtils.createSavepointWithOperatorState( + Files.createTempFile(temporaryFolder, UUID.randomUUID().toString(), "").toFile(), + savepointId); } @Nonnull diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java index 6d9c84bddf4..21d47552cfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java @@ -41,13 +41,12 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.clock.SystemClock; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; -import org.junit.Test; +import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; @@ -62,16 +61,10 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link DeclarativeSlotPoolService}. */ -public class DeclarativeSlotPoolServiceTest extends TestLogger { +class DeclarativeSlotPoolServiceTest { private static final JobID jobId = new JobID(); private static final JobMasterId jobMasterId = JobMasterId.generate(); @@ -80,32 +73,34 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { private static final String address = "localhost"; @Test - public void testUnknownTaskManagerRegistration() throws Exception { + void testUnknownTaskManagerRegistration() throws Exception { try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService()) { final ResourceID unknownTaskManager = ResourceID.generate(); - assertFalse( - declarativeSlotPoolService.isTaskManagerRegistered( - unknownTaskManager.getResourceID())); + assertThat( + declarativeSlotPoolService.isTaskManagerRegistered( + unknownTaskManager.getResourceID())) + .isFalse(); } } @Test - public void testKnownTaskManagerRegistration() throws Exception { + void testKnownTaskManagerRegistration() throws Exception { try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService()) { final ResourceID knownTaskManager = ResourceID.generate(); declarativeSlotPoolService.registerTaskManager(knownTaskManager); - assertTrue( - declarativeSlotPoolService.isTaskManagerRegistered( - knownTaskManager.getResourceID())); + assertThat( + declarativeSlotPoolService.isTaskManagerRegistered( + knownTaskManager.getResourceID())) + .isTrue(); } } @Test - public void testReleaseTaskManager() throws Exception { + void testReleaseTaskManager() throws Exception { try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService()) { final ResourceID knownTaskManager = ResourceID.generate(); @@ -113,14 +108,15 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { declarativeSlotPoolService.releaseTaskManager( knownTaskManager, new FlinkException("Test cause")); - assertFalse( - declarativeSlotPoolService.isTaskManagerRegistered( - knownTaskManager.getResourceID())); + assertThat( + declarativeSlotPoolService.isTaskManagerRegistered( + knownTaskManager.getResourceID())) + .isFalse(); } } @Test - public void testSlotOfferingOfUnknownTaskManagerIsIgnored() throws Exception { + void testSlotOfferingOfUnknownTaskManagerIsIgnored() throws Exception { try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService()) { final Collection<SlotOffer> slotOffers = @@ -138,12 +134,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { jobMasterId), slotOffers); - assertThat(acceptedSlots, is(empty())); + assertThat(acceptedSlots).isEmpty(); } } @Test - public void testSlotOfferingOfKnownTaskManager() throws Exception { + void testSlotOfferingOfKnownTaskManager() throws Exception { final AtomicReference<Collection<? extends SlotOffer>> receivedSlotOffers = new AtomicReference<>(); try (DeclarativeSlotPoolService declarativeSlotPoolService = @@ -174,12 +170,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { jobMasterId), slotOffers); - assertThat(receivedSlotOffers.get(), is(slotOffers)); + assertThat(receivedSlotOffers.get()).isEqualTo(slotOffers); } } @Test - public void testConnectToResourceManagerDeclaresRequiredResources() throws Exception { + void testConnectToResourceManagerDeclaresRequiredResources() throws Exception { final Collection<ResourceRequirement> requiredResources = Arrays.asList( ResourceRequirement.create(ResourceProfile.UNKNOWN, 2), @@ -207,14 +203,14 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { final ResourceRequirements resourceRequirements = declaredResourceRequirements.join(); - assertThat(resourceRequirements.getResourceRequirements(), is(requiredResources)); - assertThat(resourceRequirements.getJobId(), is(jobId)); - assertThat(resourceRequirements.getTargetAddress(), is(address)); + assertThat(resourceRequirements.getResourceRequirements()).isEqualTo(requiredResources); + assertThat(resourceRequirements.getJobId()).isEqualTo(jobId); + assertThat(resourceRequirements.getTargetAddress()).isEqualTo(address); } } @Test - public void testCreateAllocatedSlotReport() throws Exception { + void testCreateAllocatedSlotReport() throws Exception { final LocalTaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation(); final LocalTaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation(); final SimpleSlotContext simpleSlotContext2 = createSimpleSlotContext(taskManagerLocation2); @@ -230,14 +226,18 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { declarativeSlotPoolService.createAllocatedSlotReport( taskManagerLocation2.getResourceID()); - assertThat( - allocatedSlotReport.getAllocatedSlotInfos(), - contains(matchesWithSlotContext(simpleSlotContext2))); + assertThat(allocatedSlotReport.getAllocatedSlotInfos()) + .allMatch( + context -> + context.getAllocationId() + .equals(simpleSlotContext2.getAllocationId()) + && context.getSlotIndex() + == simpleSlotContext2.getPhysicalSlotNumber()); } } @Test - public void testFailAllocationReleasesSlot() throws Exception { + void testFailAllocationReleasesSlot() throws Exception { final CompletableFuture<AllocationID> releasedSlot = new CompletableFuture<>(); try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService( @@ -256,12 +256,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { declarativeSlotPoolService.failAllocation( taskManagerId, allocationId, new FlinkException("Test cause")); - assertThat(releasedSlot.join(), is(allocationId)); + assertThat(releasedSlot.join()).isEqualTo(allocationId); } } @Test - public void testFailLastAllocationOfTaskManagerReturnsIt() throws Exception { + void testFailLastAllocationOfTaskManagerReturnsIt() throws Exception { try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService()) { final ResourceID taskManagerId = ResourceID.generate(); @@ -272,14 +272,14 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { taskManagerId, new AllocationID(), new FlinkException("Test cause")); assertThat( - emptyTaskManager.orElseThrow( - () -> new Exception("Expected empty task manager")), - is(taskManagerId)); + emptyTaskManager.orElseThrow( + () -> new Exception("Expected empty task manager"))) + .isEqualTo(taskManagerId); } } @Test - public void testCloseReleasesAllSlotsForAllRegisteredTaskManagers() throws Exception { + void testCloseReleasesAllSlotsForAllRegisteredTaskManagers() throws Exception { final Queue<ResourceID> releasedSlotsFor = new ArrayDeque<>(2); try (DeclarativeSlotPoolService declarativeSlotPoolService = createDeclarativeSlotPoolService( @@ -301,7 +301,8 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger { declarativeSlotPoolService.close(); - assertThat(releasedSlotsFor, containsInAnyOrder(taskManagerResourceIds.toArray())); + assertThat(releasedSlotsFor) + .containsExactlyInAnyOrderElementsOf(taskManagerResourceIds); } }
