This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 269ba498658e0625ac48daa586ae3754b9fc35fe Author: Weijie Guo <[email protected]> AuthorDate: Mon Oct 24 14:00:21 2022 +0800 [FLINK-29234] Migrate JobMasterServiceLeadershipRunnerTest to JUnit5 and AssertJ. --- .../JobMasterServiceLeadershipRunnerTest.java | 242 +++++++++------------ 1 file changed, 98 insertions(+), 144 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index 388264f6cb4..f4c4ebad405 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -42,22 +41,17 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; -import java.time.Duration; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Queue; @@ -65,26 +59,17 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; -import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link JobMasterServiceLeadershipRunner}. */ -public class JobMasterServiceLeadershipRunnerTest extends TestLogger { +class JobMasterServiceLeadershipRunnerTest { private static final Time TESTING_TIMEOUT = Time.seconds(10); - @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - private static JobGraph jobGraph; private TestingLeaderElectionService leaderElectionService; @@ -93,28 +78,28 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { private JobResultStore jobResultStore; - @BeforeClass - public static void setupClass() { + @BeforeAll + static void setupClass() { final JobVertex jobVertex = new JobVertex("Test vertex"); jobVertex.setInvokableClass(NoOpInvokable.class); jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex); } - @Before - public void setup() { + @BeforeEach + void setup() { leaderElectionService = new TestingLeaderElectionService(); jobResultStore = new EmbeddedJobResultStore(); fatalErrorHandler = new TestingFatalErrorHandler(); } - @After - public void tearDown() throws Exception { + @AfterEach + void tearDown() throws Exception { fatalErrorHandler.rethrowError(); } @Test - public void testShutDownSignalsJobAsNotFinished() throws Exception { + void testShutDownSignalsJobAsNotFinished() throws Exception { try (JobManagerRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder().build()) { jobManagerRunner.start(); @@ -122,19 +107,18 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { final CompletableFuture<JobManagerRunnerResult> resultFuture = jobManagerRunner.getResultFuture(); - assertThat(resultFuture.isDone(), is(false)); + assertThat(resultFuture).isNotDone(); jobManagerRunner.closeAsync(); assertJobNotFinished(resultFuture); - assertThat( - jobManagerRunner.getJobMasterGateway(), - FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L))); + assertThat(jobManagerRunner.getJobMasterGateway()) + .failsWithin(5L, TimeUnit.MILLISECONDS); } } @Test - public void testCloseReleasesClassLoaderLease() throws Exception { + void testCloseReleasesClassLoaderLease() throws Exception { final OneShotLatch closeClassLoaderLeaseLatch = new OneShotLatch(); final TestingClassLoaderLease classLoaderLease = @@ -160,7 +144,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { * leadership operation. */ @Test - public void testConcurrentLeadershipOperationsBlockingClose() throws Exception { + void testConcurrentLeadershipOperationsBlockingClose() throws Exception { final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); final JobManagerRunner jobManagerRunner = @@ -183,14 +167,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { leaderElectionService.isLeader(UUID.randomUUID()); // the new leadership should wait first for the suspension to happen - assertThat(leaderFuture.isDone(), is(false)); + assertThat(leaderFuture).isNotDone(); - try { - leaderFuture.get(1L, TimeUnit.MILLISECONDS); - fail("Granted leadership even though the JobMaster has not been suspended."); - } catch (TimeoutException expected) { - // expected - } + assertThat(leaderFuture) + .withFailMessage( + "Granted leadership even though the JobMaster has not been suspended.") + .failsWithin(1L, TimeUnit.MILLISECONDS); terminationFuture.complete(null); @@ -198,7 +180,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { } @Test - public void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded() + void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded() throws Exception { final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>(); final TestingJobMasterServiceProcess testingJobMasterServiceProcess = @@ -219,16 +201,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { new FlinkException("The JobMasterService failed unexpectedly."); resultFuture.completeExceptionally(cause); - assertThat( - jobManagerRunner.getResultFuture(), - FlinkMatchers.futureWillCompleteExceptionally( - cause::equals, - Duration.ofMillis(5L), - "Wrong cause of failed result future")); + assertThat(jobManagerRunner.getResultFuture()) + .failsWithin(5L, TimeUnit.MILLISECONDS) + .withThrowableOfType(ExecutionException.class) + .withCause(cause); } @Test - public void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError() + void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError() throws Exception { final FlinkException testException = new FlinkException("Test exception"); @@ -251,9 +231,9 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { final JobManagerRunnerResult jobManagerRunnerResult = jobManagerRunner.getResultFuture().join(); - assertTrue(jobManagerRunnerResult.isInitializationFailure()); + assertThat(jobManagerRunnerResult.isInitializationFailure()).isTrue(); - assertThat(jobManagerRunnerResult.getInitializationFailure(), containsCause(testException)); + assertThat(jobManagerRunnerResult.getInitializationFailure()).isEqualTo(testException); } @Nonnull @@ -269,7 +249,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { } @Test - public void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception { + void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception { final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); final JobManagerRunner jobManagerRunner = @@ -287,11 +267,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { jobManagerRunner.closeAsync().join(); assertJobNotFinished(jobManagerRunner.getResultFuture()); - assertTrue(terminationFuture.isDone()); + assertThat(terminationFuture).isDone(); } @Test - public void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception { + void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception { final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); final JobManagerRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() @@ -307,11 +287,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { leaderElectionService.notLeader(); - assertTrue(terminationFuture.isDone()); + assertThat(terminationFuture).isDone(); } @Test - public void testCancellationIsForwardedToJobMasterService() throws Exception { + void testCancellationIsForwardedToJobMasterService() throws Exception { final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<>(); final JobManagerRunner jobManagerRunner = @@ -330,7 +310,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { CompletableFuture<Acknowledge> cancellationFuture = jobManagerRunner.cancel(TESTING_TIMEOUT); - assertThat(cancellationFuture.isDone(), is(false)); + assertThat(cancellationFuture).isNotDone(); AtomicBoolean cancelCalled = new AtomicBoolean(false); JobMasterGateway jobMasterGateway = @@ -346,11 +326,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { // assert that cancellation future completes when cancellation completes. cancellationFuture.get(); - assertThat(cancelCalled.get(), is(true)); + assertThat(cancelCalled).isTrue(); } @Test - public void testJobInformationOperationsDuringInitialization() throws Exception { + void testJobInformationOperationsDuringInitialization() throws Exception { final JobManagerRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() @@ -374,28 +354,26 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { private static void assertInitializingStates(JobManagerRunner jobManagerRunner) throws ExecutionException, InterruptedException { + assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get()) + .isEqualTo(JobStatus.INITIALIZING); + assertThat(jobManagerRunner.getResultFuture()).isNotDone(); assertThat( - jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), - is(JobStatus.INITIALIZING)); - assertThat(jobManagerRunner.getResultFuture().isDone(), is(false)); - assertThat( - jobManagerRunner - .requestJob(TESTING_TIMEOUT) - .get() - .getArchivedExecutionGraph() - .getState(), - is(JobStatus.INITIALIZING)); - - assertThat( - jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus(), - is(JobStatus.INITIALIZING)); + jobManagerRunner + .requestJob(TESTING_TIMEOUT) + .get() + .getArchivedExecutionGraph() + .getState()) + .isEqualTo(JobStatus.INITIALIZING); + + assertThat(jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus()) + .isEqualTo(JobStatus.INITIALIZING); } // It can happen that a series of leadership operations happens while the JobMaster // initialization is blocked. This test is to ensure that we are not starting-stopping // JobMasters for all pending leadership grants, but only for the latest. @Test - public void testSkippingOfEnqueuedLeadershipOperations() throws Exception { + void testSkippingOfEnqueuedLeadershipOperations() throws Exception { final CompletableFuture<Void> firstTerminationFuture = new CompletableFuture<>(); final CompletableFuture<Void> secondTerminationFuture = new CompletableFuture<>(); @@ -417,9 +395,8 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { // first leadership assignment to get into blocking initialization leaderElectionService.isLeader(UUID.randomUUID()); - assertThat( - jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), - is(JobStatus.INITIALIZING)); + assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get()) + .isEqualTo(JobStatus.INITIALIZING); // we are now blocked on the initialization, enqueue some operations: for (int i = 0; i < 10; i++) { @@ -432,11 +409,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { jobManagerRunner.closeAsync(); // this ensures that the second JobMasterServiceProcess is taken - assertTrue(secondTerminationFuture.isDone()); + assertThat(secondTerminationFuture).isDone(); } @Test - public void testCancellationFailsWhenInitializationFails() throws Exception { + void testCancellationFailsWhenInitializationFails() throws Exception { final FlinkException testException = new FlinkException("test exception"); runCancellationFailsTest( resultFuture -> @@ -447,13 +424,13 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { } @Test - public void testCancellationFailsWhenExceptionOccurs() throws Exception { + void testCancellationFailsWhenExceptionOccurs() throws Exception { final FlinkException testException = new FlinkException("test exception"); runCancellationFailsTest(resultFuture -> resultFuture.completeExceptionally(testException)); } - public void runCancellationFailsTest( - Consumer<CompletableFuture<JobManagerRunnerResult>> testAction) throws Exception { + void runCancellationFailsTest(Consumer<CompletableFuture<JobManagerRunnerResult>> testAction) + throws Exception { final CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture = new CompletableFuture<>(); final JobManagerRunner jobManagerRunner = @@ -472,25 +449,18 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { leaderElectionService.isLeader(UUID.randomUUID()); // cancel while initializing - assertThat( - jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), - is(JobStatus.INITIALIZING)); + assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get()) + .isEqualTo(JobStatus.INITIALIZING); CompletableFuture<Acknowledge> cancelFuture = jobManagerRunner.cancel(TESTING_TIMEOUT); - assertThat(cancelFuture.isDone(), is(false)); + assertThat(cancelFuture).isNotDone(); testAction.accept(jobManagerRunnerResultFuture); - - try { - cancelFuture.get(); - fail(); - } catch (Throwable t) { - assertThat(t, containsMessage("Cancellation failed.")); - } + assertThatThrownBy(cancelFuture::get).hasMessageContaining("Cancellation failed."); } @Test - public void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception { + void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception { final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>(); final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() @@ -510,11 +480,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { JobManagerRunnerResult.forSuccess( createFailedExecutionGraphInfo(new FlinkException("test exception")))); - assertThat(jobManagerRunner.getResultFuture(), willNotComplete(Duration.ofMillis(5L))); + assertThat(jobManagerRunner.getResultFuture()).failsWithin(5L, TimeUnit.MILLISECONDS); } @Test - public void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception { + void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception { final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( @@ -532,13 +502,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { leaderElectionService.notLeader(); - assertThat( - jobMasterGateway, - FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L))); + assertThat(jobMasterGateway).failsWithin(5L, TimeUnit.MILLISECONDS); } @Test - public void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception { + void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception { final CompletableFuture<String> leaderAddressFuture = new CompletableFuture<>(); final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() @@ -557,23 +525,22 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { leaderAddressFuture.complete("foobar"); - assertThat(leaderFuture, willNotComplete(Duration.ofMillis(5L))); + assertThat(leaderFuture).failsWithin(5L, TimeUnit.MILLISECONDS); } @Test - public void testInitialJobStatusIsInitializing() throws Exception { + void testInitialJobStatusIsInitializing() throws Exception { final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder().build(); jobManagerRunner.start(); - assertThat( - jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(), - is(JobStatus.INITIALIZING)); + assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join()) + .isEqualTo(JobStatus.INITIALIZING); } @Test - public void testCancellationChangesJobStatusToCancelling() throws Exception { + void testCancellationChangesJobStatusToCancelling() throws Exception { final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder().build(); @@ -581,13 +548,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { jobManagerRunner.cancel(TESTING_TIMEOUT); - assertThat( - jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(), - is(JobStatus.CANCELLING)); + assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join()) + .isEqualTo(JobStatus.CANCELLING); } @Test - public void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception { + void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception { final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder().build(); @@ -598,14 +564,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { leaderElectionService.isLeader(UUID.randomUUID()); leaderElectionService.notLeader(); - assertThat( - jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(), - is(JobStatus.INITIALIZING)); + assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join()) + .isEqualTo(JobStatus.INITIALIZING); } @Test - public void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() - throws Exception { + void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception { final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() @@ -624,19 +588,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { final FlinkException testException = new FlinkException("Test exception"); terminationFuture.completeExceptionally(testException); - assertThat( - jobManagerRunner.getResultFuture(), - FlinkMatchers.futureWillCompleteExceptionally( - cause -> - ExceptionUtils.findThrowable(cause, testException::equals) - .isPresent(), - Duration.ofMillis(5L), - "Result future should be completed exceptionally.")); + assertThat(jobManagerRunner.getResultFuture()) + .failsWithin(5L, TimeUnit.MILLISECONDS) + .withThrowableOfType(ExecutionException.class) + .satisfies(cause -> assertThat(cause).hasRootCause(testException)); } @Test - public void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() - throws Exception { + void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throws Exception { final FlinkRuntimeException testException = new FlinkRuntimeException("Test exception"); final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() @@ -653,18 +612,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { leaderElectionService.isLeader(UUID.randomUUID()); - assertThat( - jobManagerRunner.getResultFuture(), - FlinkMatchers.futureWillCompleteExceptionally( - cause -> - ExceptionUtils.findThrowable(cause, testException::equals) - .isPresent(), - Duration.ofMillis(5L), - "Result future should be completed exceptionally.")); + assertThat(jobManagerRunner.getResultFuture()) + .failsWithin(5L, TimeUnit.MILLISECONDS) + .withThrowableOfType(ExecutionException.class) + .satisfies(cause -> assertThat(cause).hasRootCause(testException)); } @Test - public void testJobAlreadyDone() throws Exception { + void testJobAlreadyDone() throws Exception { final JobID jobId = new JobID(); final JobResult jobResult = TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN); @@ -683,21 +638,20 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger { jobManagerRunner.getResultFuture(); JobManagerRunnerResult result = resultFuture.get(); - assertEquals( - JobStatus.FAILED, - result.getExecutionGraphInfo().getArchivedExecutionGraph().getState()); + assertThat(result.getExecutionGraphInfo().getArchivedExecutionGraph().getState()) + .isEqualTo(JobStatus.FAILED); } } private void assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> resultFuture) throws ExecutionException, InterruptedException { final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get(); - assertEquals( - jobManagerRunnerResult - .getExecutionGraphInfo() - .getArchivedExecutionGraph() - .getState(), - JobStatus.SUSPENDED); + assertThat( + jobManagerRunnerResult + .getExecutionGraphInfo() + .getArchivedExecutionGraph() + .getState()) + .isEqualTo(JobStatus.SUSPENDED); } public JobMasterServiceLeadershipRunnerBuilder newJobMasterServiceLeadershipRunnerBuilder() {
