This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0351f88cea26dc57ca7aeb78564a8e2acffc505a Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Wed Feb 15 18:30:03 2023 +0800 [hotfix] Migrate DefaultSchedulerTest to Junit5 and AssertJ. --- .../runtime/scheduler/DefaultSchedulerTest.java | 453 ++++++++++----------- .../ExceptionHistoryEntryTestingUtils.java | 90 ++++ 2 files changed, 308 insertions(+), 235 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 59122521cb2..49ba055732b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -75,7 +75,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest; -import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryMatcher; +import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryTestingUtils; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; @@ -99,16 +99,9 @@ import org.apache.flink.util.concurrent.ScheduledExecutor; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; -import org.hamcrest.collection.IsEmptyIterable; -import org.hamcrest.collection.IsIterableContainingInOrder; -import org.hamcrest.collection.IsIterableWithSize; -import org.hamcrest.core.Is; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import java.time.Duration; @@ -141,28 +134,14 @@ import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableChe import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link DefaultScheduler}. */ public class DefaultSchedulerTest extends TestLogger { private static final int TIMEOUT_MS = 1000; - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - private final ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); @@ -188,8 +167,8 @@ public class DefaultSchedulerTest extends TestLogger { private Time timeout; - @Before - public void setUp() throws Exception { + @BeforeEach + void setUp() { executor = Executors.newSingleThreadExecutor(); scheduledExecutorService = new DirectScheduledExecutorService(); @@ -211,8 +190,8 @@ public class DefaultSchedulerTest extends TestLogger { timeout = Time.seconds(60); } - @After - public void tearDown() throws Exception { + @AfterEach + void tearDown() { if (scheduledExecutorService != null) { ExecutorUtils.gracefulShutdown( TIMEOUT_MS, TimeUnit.MILLISECONDS, scheduledExecutorService); @@ -224,7 +203,7 @@ public class DefaultSchedulerTest extends TestLogger { } @Test - public void startScheduling() { + void startScheduling() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); @@ -234,11 +213,11 @@ public class DefaultSchedulerTest extends TestLogger { testExecutionOperations.getDeployedVertices(); final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); - assertThat(deployedExecutionVertices, contains(executionVertexId)); + assertThat(deployedExecutionVertices).contains(executionVertexId); } @Test - public void testCorrectSettingOfInitializationTimestamp() { + void testCorrectSettingOfInitializationTimestamp() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); @@ -248,20 +227,18 @@ public class DefaultSchedulerTest extends TestLogger { executionGraphInfo.getArchivedExecutionGraph(); // ensure all statuses are set in the ExecutionGraph - assertThat( - archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING), greaterThan(0L)); - assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED), greaterThan(0L)); - assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING), greaterThan(0L)); + assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)) + .isGreaterThan(0L); + assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)).isGreaterThan(0L); + assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)).isGreaterThan(0L); // ensure correct order - assertThat( - archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) - <= archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED), - Is.is(true)); + assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)) + .isLessThanOrEqualTo(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)); } @Test - public void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception { + void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception { final JobGraph jobGraph = singleJobVertexJobGraph(4); final JobVertexID onlyJobVertexId = getOnlyJobVertex(jobGraph).getID(); @@ -285,17 +262,17 @@ public class DefaultSchedulerTest extends TestLogger { new ExecutionVertexID(onlyJobVertexId, 3)); schedulingStrategy.schedule(verticesToSchedule); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(testExecutionOperations.getDeployedVertices()).isEmpty(); testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0)); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(testExecutionOperations.getDeployedVertices()).isEmpty(); testExecutionSlotAllocator.completePendingRequests(); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(4)); + assertThat(testExecutionOperations.getDeployedVertices()).hasSize(4); } @Test - public void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws Exception { + void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws Exception { final JobGraph jobGraph = singleJobVertexJobGraph(10); final JobVertexID onlyJobVertexId = getOnlyJobVertex(jobGraph).getID(); @@ -321,11 +298,11 @@ public class DefaultSchedulerTest extends TestLogger { final List<ExecutionVertexID> deployedExecutionVertices = testExecutionOperations.getDeployedVertices(); - assertEquals(desiredScheduleOrder, deployedExecutionVertices); + assertThat(desiredScheduleOrder).isEqualTo(deployedExecutionVertices); } @Test - public void restartAfterDeploymentFails() { + void restartAfterDeploymentFails() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); @@ -340,11 +317,11 @@ public class DefaultSchedulerTest extends TestLogger { testExecutionOperations.getDeployedVertices(); final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); - assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId)); + assertThat(deployedExecutionVertices).contains(executionVertexId, executionVertexId); } @Test - public void restartFailedTask() { + void restartFailedTask() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); @@ -366,22 +343,22 @@ public class DefaultSchedulerTest extends TestLogger { final List<ExecutionVertexID> deployedExecutionVertices = testExecutionOperations.getDeployedVertices(); final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); - assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId)); + assertThat(deployedExecutionVertices).contains(executionVertexId, executionVertexId); } @Test - public void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() { + void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); final TaskExecutionState taskExecutionState = createFailedTaskExecutionState(createExecutionAttemptId()); - assertFalse(scheduler.updateTaskExecutionState(taskExecutionState)); + assertThat(scheduler.updateTaskExecutionState(taskExecutionState)).isFalse(); } @Test - public void failJobIfCannotRestart() throws Exception { + void failJobIfCannotRestart() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); testRestartBackoffTimeStrategy.setCanRestart(false); @@ -402,11 +379,11 @@ public class DefaultSchedulerTest extends TestLogger { waitForTermination(scheduler); final JobStatus jobStatus = scheduler.requestJobStatus(); - assertThat(jobStatus, is(equalTo(JobStatus.FAILED))); + assertThat(jobStatus).isEqualTo(JobStatus.FAILED); } @Test - public void failJobIfNotEnoughResources() throws Exception { + void failJobIfNotEnoughResources() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); testRestartBackoffTimeStrategy.setCanRestart(false); testExecutionSlotAllocator.disableAutoCompletePendingRequests(); @@ -417,7 +394,7 @@ public class DefaultSchedulerTest extends TestLogger { waitForTermination(scheduler); final JobStatus jobStatus = scheduler.requestJobStatus(); - assertThat(jobStatus, is(equalTo(JobStatus.FAILED))); + assertThat(jobStatus).isEqualTo(JobStatus.FAILED); Throwable failureCause = scheduler @@ -426,24 +403,24 @@ public class DefaultSchedulerTest extends TestLogger { .getFailureInfo() .getException() .deserializeError(DefaultSchedulerTest.class.getClassLoader()); - assertTrue(findThrowable(failureCause, NoResourceAvailableException.class).isPresent()); - assertTrue( - findThrowableWithMessage( + assertThat(findThrowable(failureCause, NoResourceAvailableException.class)).isPresent(); + assertThat( + findThrowableWithMessage( failureCause, - "Could not allocate the required slot within slot request timeout.") - .isPresent()); - assertThat(jobStatus, is(equalTo(JobStatus.FAILED))); + "Could not allocate the required slot within slot request timeout.")) + .isPresent(); + assertThat(jobStatus).isEqualTo(JobStatus.FAILED); } @Test - public void restartVerticesOnSlotAllocationTimeout() throws Exception { + void restartVerticesOnSlotAllocationTimeout() throws Exception { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); testRestartVerticesOnFailuresInScheduling( vid -> testExecutionSlotAllocator.timeoutPendingRequest(vid)); } @Test - public void restartVerticesOnAssignedSlotReleased() throws Exception { + void restartVerticesOnAssignedSlotReleased() throws Exception { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); testRestartVerticesOnFailuresInScheduling( vid -> { @@ -481,7 +458,7 @@ public class DefaultSchedulerTest extends TestLogger { final ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1); schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22)); - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(4)); + assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(4); actionsToTriggerTaskFailure.accept(vid11); @@ -498,20 +475,18 @@ public class DefaultSchedulerTest extends TestLogger { // ev11 and ev21 needs to be restarted because it is pipelined region failover and // they are in the same region. ev12 and ev22 will not be affected - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(2)); - assertThat(ev11.getExecutionState(), is(ExecutionState.FAILED)); - assertThat(ev21.getExecutionState(), is(ExecutionState.CANCELED)); - assertThat(ev12.getExecutionState(), is(ExecutionState.SCHEDULED)); - assertThat(ev22.getExecutionState(), is(ExecutionState.SCHEDULED)); + assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2); + assertThat(ev11.getExecutionState()).isEqualTo(ExecutionState.FAILED); + assertThat(ev21.getExecutionState()).isEqualTo(ExecutionState.CANCELED); + assertThat(ev12.getExecutionState()).isEqualTo(ExecutionState.SCHEDULED); + assertThat(ev22.getExecutionState()).isEqualTo(ExecutionState.SCHEDULED); taskRestartExecutor.triggerScheduledTasks(); - assertThat( - schedulingStrategy.getReceivedVerticesToRestart(), - containsInAnyOrder(vid11, vid21)); + assertThat(schedulingStrategy.getReceivedVerticesToRestart()).contains(vid11, vid21); } @Test - public void skipDeploymentIfVertexVersionOutdated() { + void skipDeploymentIfVertexVersionOutdated() { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); @@ -540,16 +515,14 @@ public class DefaultSchedulerTest extends TestLogger { testExecutionSlotAllocator.enableAutoCompletePendingRequests(); taskRestartExecutor.triggerScheduledTasks(); - assertThat( - testExecutionOperations.getDeployedVertices(), - containsInAnyOrder(sourceExecutionVertexId, sinkExecutionVertexId)); - assertThat( - scheduler.requestJob().getArchivedExecutionGraph().getState(), - is(equalTo(JobStatus.RUNNING))); + assertThat(testExecutionOperations.getDeployedVertices()) + .contains(sourceExecutionVertexId, sinkExecutionVertexId); + assertThat(scheduler.requestJob().getArchivedExecutionGraph().getState()) + .isEqualTo(JobStatus.RUNNING); } @Test - public void releaseSlotIfVertexVersionOutdated() { + void releaseSlotIfVertexVersionOutdated() { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); @@ -561,11 +534,11 @@ public class DefaultSchedulerTest extends TestLogger { executionVertexVersioner.recordModification(onlyExecutionVertexId); testExecutionSlotAllocator.completePendingRequests(); - assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(1)); + assertThat(testExecutionSlotAllocator.getReturnedSlots()).hasSize(1); } @Test - public void vertexIsResetBeforeRestarted() throws Exception { + void vertexIsResetBeforeRestarted() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final TestSchedulingStrategy.Factory schedulingStrategyFactory = @@ -597,12 +570,12 @@ public class DefaultSchedulerTest extends TestLogger { taskRestartExecutor.triggerScheduledTasks(); - assertThat(schedulingStrategy.getReceivedVerticesToRestart(), hasSize(1)); - assertThat(onlySchedulingVertex.getState(), is(equalTo(ExecutionState.CREATED))); + assertThat(schedulingStrategy.getReceivedVerticesToRestart()).hasSize(1); + assertThat(onlySchedulingVertex.getState()).isEqualTo(ExecutionState.CREATED); } @Test - public void scheduleOnlyIfVertexIsCreated() throws Exception { + void scheduleOnlyIfVertexIsCreated() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final TestSchedulingStrategy.Factory schedulingStrategyFactory = @@ -625,16 +598,16 @@ public class DefaultSchedulerTest extends TestLogger { schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId)); // The scheduling of a non-CREATED vertex will result in IllegalStateException - try { - schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId)); - fail("IllegalStateException should happen"); - } catch (IllegalStateException e) { - // expected exception - } + assertThatThrownBy( + () -> + schedulingStrategy.schedule( + Collections.singletonList(onlySchedulingVertexId)), + "IllegalStateException should happen") + .isInstanceOf(IllegalStateException.class); } @Test - public void handleGlobalFailure() { + void handleGlobalFailure() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); @@ -658,7 +631,7 @@ public class DefaultSchedulerTest extends TestLogger { final List<ExecutionVertexID> deployedExecutionVertices = testExecutionOperations.getDeployedVertices(); final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); - assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId)); + assertThat(deployedExecutionVertices).contains(executionVertexId, executionVertexId); } /** @@ -668,7 +641,7 @@ public class DefaultSchedulerTest extends TestLogger { * updates. */ @Test - public void handleGlobalFailureWithLocalFailure() { + void handleGlobalFailureWithLocalFailure() { final JobGraph jobGraph = singleJobVertexJobGraph(2); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); enableCheckpointing(jobGraph); @@ -706,24 +679,26 @@ public class DefaultSchedulerTest extends TestLogger { new ExecutionVertexID(onlyJobVertex.getID(), 0); final ExecutionVertexID executionVertexId1 = new ExecutionVertexID(onlyJobVertex.getID(), 1); - assertThat( - "The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.", - testExecutionOperations.getDeployedVertices(), - contains( + assertThat(testExecutionOperations.getDeployedVertices()) + .withFailMessage( + "The " + + "execution vertices should be deployed in a specific order reflecting the " + + "scheduling start and the global fail-over afterwards.") + .contains( executionVertexId0, executionVertexId1, executionVertexId0, - executionVertexId1)); + executionVertexId1); } @Test - public void testStartingCheckpointSchedulerAfterExecutionGraphFinished() { + void testStartingCheckpointSchedulerAfterExecutionGraphFinished() { assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished( SchedulerBase::startCheckpointScheduler); } @Test - public void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() { + void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() { assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished( SchedulerBase::stopCheckpointScheduler); } @@ -734,7 +709,7 @@ public class DefaultSchedulerTest extends TestLogger { enableCheckpointing(jobGraph); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); - assertThat(scheduler.getCheckpointCoordinator(), is(notNullValue())); + assertThat(scheduler.getCheckpointCoordinator()).isNotNull(); scheduler.updateTaskExecutionState( new TaskExecutionState( Iterables.getOnlyElement( @@ -743,13 +718,13 @@ public class DefaultSchedulerTest extends TestLogger { .getAttemptId(), ExecutionState.FINISHED)); - assertThat(scheduler.getCheckpointCoordinator(), is(nullValue())); + assertThat(scheduler.getCheckpointCoordinator()).isNull(); callSchedulingOperation.accept(scheduler); - assertThat(scheduler.getCheckpointCoordinator(), is(nullValue())); + assertThat(scheduler.getCheckpointCoordinator()).isNull(); } @Test - public void vertexIsNotAffectedByOutdatedDeployment() { + void vertexIsNotAffectedByOutdatedDeployment() { final JobGraph jobGraph = singleJobVertexJobGraph(2); testExecutionSlotAllocator.disableAutoCompletePendingRequests(); @@ -779,11 +754,11 @@ public class DefaultSchedulerTest extends TestLogger { createFailedTaskExecutionState(v2.getCurrentExecutionAttempt().getAttemptId())); // v1 should not be affected - assertThat(sv1.getState(), is(equalTo(ExecutionState.SCHEDULED))); + assertThat(sv1.getState()).isEqualTo(ExecutionState.SCHEDULED); } @Test - public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { + void abortPendingCheckpointsWhenRestartingTasks() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); enableCheckpointing(jobGraph); @@ -809,15 +784,15 @@ public class DefaultSchedulerTest extends TestLogger { checkpointCoordinator.triggerCheckpoint(false); checkpointTriggeredLatch.await(); - assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1))); + assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne(); scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId)); taskRestartExecutor.triggerScheduledTasks(); - assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); + assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero(); } @Test - public void restoreStateWhenRestartingTasks() throws Exception { + void restoreStateWhenRestartingTasks() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); enableCheckpointing(jobGraph); @@ -854,11 +829,11 @@ public class DefaultSchedulerTest extends TestLogger { scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId)); taskRestartExecutor.triggerScheduledTasks(); - assertThat(masterHook.getRestoreCount(), is(equalTo(1))); + assertThat(masterHook.getRestoreCount()).isOne(); } @Test - public void failGlobalWhenRestoringStateFails() throws Exception { + void failGlobalWhenRestoringStateFails() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); enableCheckpointing(jobGraph); @@ -902,17 +877,17 @@ public class DefaultSchedulerTest extends TestLogger { List<ExecutionVertexID> deployedExecutionVertices = testExecutionOperations.getDeployedVertices(); final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); - assertThat(deployedExecutionVertices, contains(executionVertexId)); + assertThat(deployedExecutionVertices).contains(executionVertexId); // a global failure should be triggered on state restore failure masterHook.disableFailOnRestore(); taskRestartExecutor.triggerScheduledTasks(); deployedExecutionVertices = testExecutionOperations.getDeployedVertices(); - assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId)); + assertThat(deployedExecutionVertices).contains(executionVertexId, executionVertexId); } @Test - public void failJobWillIncrementVertexVersions() { + void failJobWillIncrementVertexVersions() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); final ExecutionVertexID onlyExecutionVertexId = @@ -924,11 +899,11 @@ public class DefaultSchedulerTest extends TestLogger { scheduler.failJob(new FlinkException("Test failure."), System.currentTimeMillis()); - assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); + assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue(); } @Test - public void cancelJobWillIncrementVertexVersions() { + void cancelJobWillIncrementVertexVersions() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); final ExecutionVertexID onlyExecutionVertexId = @@ -940,11 +915,11 @@ public class DefaultSchedulerTest extends TestLogger { scheduler.cancel(); - assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); + assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue(); } @Test - public void suspendJobWillIncrementVertexVersions() throws Exception { + void suspendJobWillIncrementVertexVersions() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); final ExecutionVertexID onlyExecutionVertexId = @@ -956,11 +931,11 @@ public class DefaultSchedulerTest extends TestLogger { scheduler.close(); - assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); + assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue(); } @Test - public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() { + void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() { final JobGraph jobGraph = singleJobVertexJobGraph(2); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); @@ -988,13 +963,13 @@ public class DefaultSchedulerTest extends TestLogger { taskRestartExecutor.triggerNonPeriodicScheduledTask(); final JobStatus jobStatusAfterRestarts = scheduler.requestJobStatus(); - assertThat(jobStatusAfterFirstFailure, equalTo(JobStatus.RESTARTING)); - assertThat(jobStatusWithPendingRestarts, equalTo(JobStatus.RESTARTING)); - assertThat(jobStatusAfterRestarts, equalTo(JobStatus.RUNNING)); + assertThat(jobStatusAfterFirstFailure).isEqualTo(JobStatus.RESTARTING); + assertThat(jobStatusWithPendingRestarts).isEqualTo(JobStatus.RESTARTING); + assertThat(jobStatusAfterRestarts).isEqualTo(JobStatus.RUNNING); } @Test - public void cancelWhileRestartingShouldWaitForRunningTasks() { + void cancelWhileRestartingShouldWaitForRunningTasks() { final JobGraph jobGraph = singleJobVertexJobGraph(2); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); final SchedulingTopology topology = scheduler.getSchedulingTopology(); @@ -1022,13 +997,13 @@ public class DefaultSchedulerTest extends TestLogger { new TaskExecutionState( attemptId2, ExecutionState.CANCELED, new RuntimeException("expected"))); - assertThat(vertex2StateAfterCancel, is(equalTo(ExecutionState.CANCELING))); - assertThat(statusAfterCancelWhileRestarting, is(equalTo(JobStatus.CANCELLING))); - assertThat(scheduler.requestJobStatus(), is(equalTo(JobStatus.CANCELED))); + assertThat(vertex2StateAfterCancel).isEqualTo(ExecutionState.CANCELING); + assertThat(statusAfterCancelWhileRestarting).isEqualTo(JobStatus.CANCELLING); + assertThat(scheduler.requestJobStatus()).isEqualTo(JobStatus.CANCELED); } @Test - public void failureInfoIsSetAfterTaskFailure() { + void failureInfoIsSetAfterTaskFailure() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); @@ -1048,12 +1023,12 @@ public class DefaultSchedulerTest extends TestLogger { final ErrorInfo failureInfo = scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo(); - assertThat(failureInfo, is(notNullValue())); - assertThat(failureInfo.getExceptionAsString(), containsString(exceptionMessage)); + assertThat(failureInfo).isNotNull(); + assertThat(failureInfo.getExceptionAsString()).contains(exceptionMessage); } @Test - public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception { + void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception { final JobGraph jobGraph = singleJobVertexJobGraph(2); testExecutionSlotAllocator.disableAutoCompletePendingRequests(); @@ -1073,7 +1048,7 @@ public class DefaultSchedulerTest extends TestLogger { .iterator(); ArchivedExecutionVertex v1 = vertexIterator.next(); - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(2)); + assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2); final String exceptionMessage = "expected exception"; scheduler.updateTaskExecutionState( @@ -1090,13 +1065,13 @@ public class DefaultSchedulerTest extends TestLogger { .iterator(); v1 = vertexIterator.next(); ArchivedExecutionVertex v2 = vertexIterator.next(); - assertThat(v1.getExecutionState(), is(ExecutionState.FAILED)); - assertThat(v2.getExecutionState(), is(ExecutionState.CANCELED)); - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); + assertThat(v1.getExecutionState()).isEqualTo(ExecutionState.FAILED); + assertThat(v2.getExecutionState()).isEqualTo(ExecutionState.CANCELED); + assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty(); } @Test - public void pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots() + void pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots() throws Exception { final int parallelism = 10; final JobGraph jobGraph = sourceSinkJobGraph(parallelism); @@ -1118,12 +1093,11 @@ public class DefaultSchedulerTest extends TestLogger { testExecutionSlotAllocator.getPendingRequests().values().stream() .map(ExecutionSlotAssignment::getLogicalSlotFuture) .collect(Collectors.toSet()); - assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2)); + assertThat(pendingLogicalSlotFutures).hasSize(parallelism * 2); testExecutionSlotAllocator.completePendingRequest(ev1.getID()); - assertThat( - pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count(), - is(1L)); + assertThat(pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count()) + .isEqualTo(1L); final String exceptionMessage = "expected exception"; scheduler.updateTaskExecutionState( @@ -1132,21 +1106,23 @@ public class DefaultSchedulerTest extends TestLogger { ExecutionState.FAILED, new RuntimeException(exceptionMessage))); - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); + assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty(); // the failed task will return its slot before triggering failover. And the slot // will be returned and re-assigned to another task which is waiting for a slot. // failover will be triggered after that and the re-assigned slot will be returned // once the attached task is canceled, but the slot will not be assigned to other // tasks which are identified to be restarted soon. - assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(2)); + assertThat(testExecutionSlotAllocator.getReturnedSlots()).hasSize(2); assertThat( - pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count(), - is(parallelism * 2L - 2L)); + pendingLogicalSlotFutures.stream() + .filter(CompletableFuture::isCancelled) + .count()) + .isEqualTo(parallelism * 2L - 2L); } @Test - public void testExceptionHistoryWithGlobalFailOver() { + void testExceptionHistoryWithGlobalFailOver() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); @@ -1171,19 +1147,21 @@ public class DefaultSchedulerTest extends TestLogger { final Iterable<RootExceptionHistoryEntry> actualExceptionHistory = scheduler.getExceptionHistory(); - assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(1)); + assertThat(actualExceptionHistory).hasSize(1); final RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next(); + assertThat( - failure, - ExceptionHistoryEntryMatcher.matchesGlobalFailure( - expectedException, - scheduler.getExecutionGraph().getFailureInfo().getTimestamp())); - assertThat(failure.getConcurrentExceptions(), IsEmptyIterable.emptyIterable()); + ExceptionHistoryEntryTestingUtils.matchesGlobalFailure( + failure, + expectedException, + scheduler.getExecutionGraph().getFailureInfo().getTimestamp())) + .isTrue(); + assertThat(failure.getConcurrentExceptions()).isEmpty(); } @Test - public void testExceptionHistoryWithRestartableFailure() { + void testExceptionHistoryWithRestartableFailure() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); @@ -1229,20 +1207,26 @@ public class DefaultSchedulerTest extends TestLogger { scheduler.getExceptionHistory(); // assert restarted attempt + assertThat(actualExceptionHistory).hasSize(2); + Iterator<RootExceptionHistoryEntry> iterator = actualExceptionHistory.iterator(); + RootExceptionHistoryEntry entry0 = iterator.next(); assertThat( - actualExceptionHistory, - IsIterableContainingInOrder.contains( - ExceptionHistoryEntryMatcher.matchesFailure( + ExceptionHistoryEntryTestingUtils.matchesFailure( + entry0, restartableException, updateStateTriggeringRestartTimestamp, taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), - taskFailureExecutionVertex.getCurrentAssignedResourceLocation()), - ExceptionHistoryEntryMatcher.matchesGlobalFailure( - failingException, updateStateTriggeringJobFailureTimestamp))); + taskFailureExecutionVertex.getCurrentAssignedResourceLocation())) + .isTrue(); + RootExceptionHistoryEntry entry1 = iterator.next(); + assertThat( + ExceptionHistoryEntryTestingUtils.matchesGlobalFailure( + entry1, failingException, updateStateTriggeringJobFailureTimestamp)) + .isTrue(); } @Test - public void testExceptionHistoryWithPreDeployFailure() { + void testExceptionHistoryWithPreDeployFailure() { // disable auto-completing slot requests to simulate timeout executionSlotAllocatorFactory .getTestExecutionSlotAllocator() @@ -1263,8 +1247,7 @@ public class DefaultSchedulerTest extends TestLogger { taskRestartExecutor.triggerNonPeriodicScheduledTask(); // sanity check that the TaskManagerLocation of the failed task is indeed null, as expected - assertThat( - taskFailureExecutionVertex.getCurrentAssignedResourceLocation(), is(nullValue())); + assertThat(taskFailureExecutionVertex.getCurrentAssignedResourceLocation()).isNull(); final ErrorInfo failureInfo = taskFailureExecutionVertex @@ -1273,18 +1256,20 @@ public class DefaultSchedulerTest extends TestLogger { final Iterable<RootExceptionHistoryEntry> actualExceptionHistory = scheduler.getExceptionHistory(); - assertThat( - actualExceptionHistory, - IsIterableContainingInOrder.contains( - ExceptionHistoryEntryMatcher.matchesFailure( - failureInfo.getException(), - failureInfo.getTimestamp(), - taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), - taskFailureExecutionVertex.getCurrentAssignedResourceLocation()))); + assertThat(actualExceptionHistory) + .anySatisfy( + e -> + ExceptionHistoryEntryTestingUtils.matchesFailure( + e, + failureInfo.getException(), + failureInfo.getTimestamp(), + taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), + taskFailureExecutionVertex + .getCurrentAssignedResourceLocation())); } @Test - public void testExceptionHistoryConcurrentRestart() throws Exception { + void testExceptionHistoryConcurrentRestart() throws Exception { final JobGraph jobGraph = singleJobVertexJobGraph(2); final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); @@ -1336,42 +1321,43 @@ public class DefaultSchedulerTest extends TestLogger { delayExecutor.triggerNonPeriodicScheduledTasks(); - assertThat(scheduler.getExceptionHistory(), IsIterableWithSize.iterableWithSize(2)); + assertThat(scheduler.getExceptionHistory()).hasSize(2); final Iterator<RootExceptionHistoryEntry> actualExceptionHistory = scheduler.getExceptionHistory().iterator(); final RootExceptionHistoryEntry entry0 = actualExceptionHistory.next(); assertThat( - entry0, - is( - ExceptionHistoryEntryMatcher.matchesFailure( + ExceptionHistoryEntryTestingUtils.matchesFailure( + entry0, exception0, updateStateTriggeringRestartTimestamp0, executionVertex0.getTaskNameWithSubtaskIndex(), - executionVertex0.getCurrentAssignedResourceLocation()))); - assertThat( - entry0.getConcurrentExceptions(), - IsIterableContainingInOrder.contains( - ExceptionHistoryEntryMatcher.matchesFailure( - exception1, - updateStateTriggeringRestartTimestamp1, - executionVertex1.getTaskNameWithSubtaskIndex(), - executionVertex1.getCurrentAssignedResourceLocation()))); + executionVertex0.getCurrentAssignedResourceLocation())) + .isTrue(); + assertThat(entry0.getConcurrentExceptions()) + .anySatisfy( + e -> + ExceptionHistoryEntryTestingUtils.matchesFailure( + e, + exception1, + updateStateTriggeringRestartTimestamp1, + executionVertex1.getTaskNameWithSubtaskIndex(), + executionVertex1.getCurrentAssignedResourceLocation())); final RootExceptionHistoryEntry entry1 = actualExceptionHistory.next(); assertThat( - entry1, - is( - ExceptionHistoryEntryMatcher.matchesFailure( + ExceptionHistoryEntryTestingUtils.matchesFailure( + entry1, exception1, updateStateTriggeringRestartTimestamp1, executionVertex1.getTaskNameWithSubtaskIndex(), - executionVertex1.getCurrentAssignedResourceLocation()))); - assertThat(entry1.getConcurrentExceptions(), IsEmptyIterable.emptyIterable()); + executionVertex1.getCurrentAssignedResourceLocation())) + .isTrue(); + assertThat(entry1.getConcurrentExceptions()).isEmpty(); } @Test - public void testExceptionHistoryTruncation() { + void testExceptionHistoryTruncation() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); configuration.set(WebOptions.MAX_EXCEPTION_HISTORY_SIZE, 1); @@ -1403,18 +1389,19 @@ public class DefaultSchedulerTest extends TestLogger { taskRestartExecutor.triggerNonPeriodicScheduledTasks(); - assertThat( - scheduler.getExceptionHistory(), - IsIterableContainingInOrder.contains( - ExceptionHistoryEntryMatcher.matchesFailure( - exception, - relevantTimestamp, - executionVertex1.getTaskNameWithSubtaskIndex(), - executionVertex1.getCurrentAssignedResourceLocation()))); + assertThat(scheduler.getExceptionHistory()) + .anySatisfy( + e -> + ExceptionHistoryEntryTestingUtils.matchesFailure( + e, + exception, + relevantTimestamp, + executionVertex1.getTaskNameWithSubtaskIndex(), + executionVertex1.getCurrentAssignedResourceLocation())); } @Test - public void testStatusMetrics() throws Exception { + void testStatusMetrics() throws Exception { // running time acts as a stand-in for generic status time metrics final CompletableFuture<Gauge<Long>> runningTimeMetricFuture = new CompletableFuture<>(); final MetricRegistry metricRegistry = @@ -1492,11 +1479,11 @@ public class DefaultSchedulerTest extends TestLogger { Thread.sleep(10L); final Gauge<Long> runningTimeGauge = runningTimeMetricFuture.get(); - Assert.assertThat(runningTimeGauge.getValue(), greaterThan(0L)); + assertThat(runningTimeGauge.getValue()).isGreaterThan(0L); } @Test - public void testDeploymentWaitForProducedPartitionRegistration() { + void testDeploymentWaitForProducedPartitionRegistration() { shuffleMaster.setAutoCompleteRegistration(false); final List<ResultPartitionID> trackedPartitions = new ArrayList<>(); @@ -1511,44 +1498,44 @@ public class DefaultSchedulerTest extends TestLogger { createSchedulerAndStartScheduling(jobGraph); - assertThat(trackedPartitions, hasSize(0)); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(trackedPartitions).isEmpty(); + assertThat(testExecutionOperations.getDeployedVertices()).isEmpty(); shuffleMaster.completeAllPendingRegistrations(); - assertThat(trackedPartitions, hasSize(1)); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(2)); + assertThat(trackedPartitions).hasSize(1); + assertThat(testExecutionOperations.getDeployedVertices()).hasSize(2); } @Test - public void testFailedProducedPartitionRegistration() { + void testFailedProducedPartitionRegistration() { shuffleMaster.setAutoCompleteRegistration(false); final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); createSchedulerAndStartScheduling(jobGraph); - assertThat(testExecutionOperations.getCanceledVertices(), hasSize(0)); - assertThat(testExecutionOperations.getFailedVertices(), hasSize(0)); + assertThat(testExecutionOperations.getCanceledVertices()).isEmpty(); + assertThat(testExecutionOperations.getFailedVertices()).isEmpty(); shuffleMaster.failAllPendingRegistrations(); - assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2)); - assertThat(testExecutionOperations.getFailedVertices(), hasSize(1)); + assertThat(testExecutionOperations.getCanceledVertices()).hasSize(2); + assertThat(testExecutionOperations.getFailedVertices()).hasSize(1); } @Test - public void testDirectExceptionOnProducedPartitionRegistration() { + void testDirectExceptionOnProducedPartitionRegistration() { shuffleMaster.setThrowExceptionalOnRegistration(true); final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); createSchedulerAndStartScheduling(jobGraph); - assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2)); - assertThat(testExecutionOperations.getFailedVertices(), hasSize(1)); + assertThat(testExecutionOperations.getCanceledVertices()).hasSize(2); + assertThat(testExecutionOperations.getFailedVertices()).hasSize(1); } @Test - public void testProducedPartitionRegistrationTimeout() throws Exception { + void testProducedPartitionRegistrationTimeout() throws Exception { ScheduledExecutorService scheduledExecutorService = null; try { scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); @@ -1573,7 +1560,7 @@ public class DefaultSchedulerTest extends TestLogger { } @Test - public void testLateRegisteredPartitionsWillBeReleased() { + void testLateRegisteredPartitionsWillBeReleased() { shuffleMaster.setAutoCompleteRegistration(false); final List<ResultPartitionID> trackedPartitions = new ArrayList<>(); @@ -1601,12 +1588,12 @@ public class DefaultSchedulerTest extends TestLogger { // late registered partitions will not be tracked and will be released shuffleMaster.completeAllPendingRegistrations(); - assertThat(trackedPartitions, hasSize(0)); - assertThat(shuffleMaster.getExternallyReleasedPartitions(), hasSize(1)); + assertThat(trackedPartitions).isEmpty(); + assertThat(shuffleMaster.getExternallyReleasedPartitions()).hasSize(1); } @Test - public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception { + void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception { final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); try { @@ -1635,17 +1622,17 @@ public class DefaultSchedulerTest extends TestLogger { } @Test - public void testJobStatusHookWithJobFailed() throws Exception { + void testJobStatusHookWithJobFailed() throws Exception { commonJobStatusHookTest(ExecutionState.FAILED, JobStatus.FAILED); } @Test - public void testJobStatusHookWithJobCanceled() throws Exception { + void testJobStatusHookWithJobCanceled() throws Exception { commonJobStatusHookTest(ExecutionState.CANCELED, JobStatus.CANCELED); } @Test - public void testJobStatusHookWithJobFinished() throws Exception { + void testJobStatusHookWithJobFinished() throws Exception { commonJobStatusHookTest(ExecutionState.FINISHED, JobStatus.FINISHED); } @@ -1701,14 +1688,10 @@ public class DefaultSchedulerTest extends TestLogger { waitForTermination(scheduler); final JobStatus jobStatus = scheduler.requestJobStatus(); - org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus); - org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1); - org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0)) - .isEqualTo(jobGraph.getJobID()); + assertThat(jobStatus).isEqualTo(expectedJobStatus); + assertThat(onCreatedJobList).singleElement().isEqualTo(jobGraph.getJobID()); - org.assertj.core.api.Assertions.assertThat(onJobStatusList).hasSize(1); - org.assertj.core.api.Assertions.assertThat(onJobStatusList.get(0)) - .isEqualTo(jobGraph.getJobID()); + assertThat(onCreatedJobList).singleElement().isEqualTo(jobGraph.getJobID()); } /** @@ -1775,9 +1758,9 @@ public class DefaultSchedulerTest extends TestLogger { // Wait for scheduler to start closing. schedulerClosing.await(); - assertFalse( - "CheckpointCleaner should not close before checkpoint services.", - cleanerClosed.await(10, TimeUnit.MILLISECONDS)); + assertThat(cleanerClosed.await(10, TimeUnit.MILLISECONDS)) + .withFailMessage("CheckpointCleaner should not close before checkpoint services.") + .isFalse(); checkpointServicesShutdownBlocked.countDown(); cleanerClosed.await(); schedulerClosed.get(); @@ -1992,7 +1975,7 @@ public class DefaultSchedulerTest extends TestLogger { } /** Actually schedules the collected {@link ScheduledTask ScheduledTasks}. */ - public void scheduleCollectedScheduledTasks() { + void scheduleCollectedScheduledTasks() { for (ScheduledTask<?> scheduledTask : scheduledTasks) { super.schedule( scheduledTask.getCallable(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java new file mode 100644 index 00000000000..08fad7e96b9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Objects; + +import static org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation; + +/** A utility class to matches {@link ExceptionHistoryEntry} instances for testing. */ +public class ExceptionHistoryEntryTestingUtils { + + public static boolean matchesGlobalFailure( + ExceptionHistoryEntry exceptionHistoryEntry, + Throwable expectedException, + long expectedTimestamp) { + return matchesInternal( + exceptionHistoryEntry, expectedException, expectedTimestamp, null, null); + } + + public static boolean matchesFailure( + ExceptionHistoryEntry exceptionHistoryEntry, + Throwable expectedException, + long expectedTimestamp, + String expectedTaskName, + TaskManagerLocation expectedTaskManagerLocation) { + return matchesInternal( + exceptionHistoryEntry, + expectedException, + expectedTimestamp, + expectedTaskName, + expectedTaskManagerLocation); + } + + private static boolean matchesInternal( + ExceptionHistoryEntry exceptionHistoryEntry, + Throwable expectedException, + long expectedTimestamp, + String expectedTaskName, + TaskManagerLocation expectedTaskManagerLocation) { + boolean match = + exceptionHistoryEntry + .getException() + .deserializeError(ClassLoader.getSystemClassLoader()) + .equals(expectedException) + && exceptionHistoryEntry.getTimestamp() == expectedTimestamp + && !Objects.equals( + exceptionHistoryEntry.getFailingTaskName(), expectedTaskName); + + match |= + matchesTaskManagerLocation( + exceptionHistoryEntry.getTaskManagerLocation(), + fromTaskManagerLocation(expectedTaskManagerLocation)); + + return match; + } + + private static boolean matchesTaskManagerLocation( + ExceptionHistoryEntry.ArchivedTaskManagerLocation actual, + ExceptionHistoryEntry.ArchivedTaskManagerLocation expectedLocation) { + if (actual == null) { + return expectedLocation == null; + } else if (expectedLocation == null) { + return false; + } + + return Objects.equals(actual.getAddress(), expectedLocation.getAddress()) + && Objects.equals(actual.getFQDNHostname(), expectedLocation.getFQDNHostname()) + && Objects.equals(actual.getHostname(), expectedLocation.getHostname()) + && Objects.equals(actual.getResourceID(), expectedLocation.getResourceID()) + && Objects.equals(actual.getPort(), expectedLocation.getPort()); + } +}