This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0351f88cea26dc57ca7aeb78564a8e2acffc505a
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Wed Feb 15 18:30:03 2023 +0800

    [hotfix] Migrate DefaultSchedulerTest to Junit5 and AssertJ.
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 453 ++++++++++-----------
 .../ExceptionHistoryEntryTestingUtils.java         |  90 ++++
 2 files changed, 308 insertions(+), 235 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 59122521cb2..49ba055732b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -75,7 +75,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
-import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryMatcher;
+import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryTestingUtils;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
@@ -99,16 +99,9 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
-import org.hamcrest.collection.IsEmptyIterable;
-import org.hamcrest.collection.IsIterableContainingInOrder;
-import org.hamcrest.collection.IsIterableWithSize;
-import org.hamcrest.core.Is;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 
 import java.time.Duration;
@@ -141,28 +134,14 @@ import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableChe
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link DefaultScheduler}. */
 public class DefaultSchedulerTest extends TestLogger {
 
     private static final int TIMEOUT_MS = 1000;
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
     private final ManuallyTriggeredScheduledExecutor taskRestartExecutor =
             new ManuallyTriggeredScheduledExecutor();
 
@@ -188,8 +167,8 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private Time timeout;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() {
         executor = Executors.newSingleThreadExecutor();
         scheduledExecutorService = new DirectScheduledExecutorService();
 
@@ -211,8 +190,8 @@ public class DefaultSchedulerTest extends TestLogger {
         timeout = Time.seconds(60);
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    void tearDown() {
         if (scheduledExecutorService != null) {
             ExecutorUtils.gracefulShutdown(
                     TIMEOUT_MS, TimeUnit.MILLISECONDS, 
scheduledExecutorService);
@@ -224,7 +203,7 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void startScheduling() {
+    void startScheduling() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
 
@@ -234,11 +213,11 @@ public class DefaultSchedulerTest extends TestLogger {
                 testExecutionOperations.getDeployedVertices();
 
         final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
-        assertThat(deployedExecutionVertices, contains(executionVertexId));
+        assertThat(deployedExecutionVertices).contains(executionVertexId);
     }
 
     @Test
-    public void testCorrectSettingOfInitializationTimestamp() {
+    void testCorrectSettingOfInitializationTimestamp() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
@@ -248,20 +227,18 @@ public class DefaultSchedulerTest extends TestLogger {
                 executionGraphInfo.getArchivedExecutionGraph();
 
         // ensure all statuses are set in the ExecutionGraph
-        assertThat(
-                
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING), 
greaterThan(0L));
-        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED), 
greaterThan(0L));
-        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING), 
greaterThan(0L));
+        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING))
+                .isGreaterThan(0L);
+        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)).isGreaterThan(0L);
+        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)).isGreaterThan(0L);
 
         // ensure correct order
-        assertThat(
-                
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)
-                        <= 
archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED),
-                Is.is(true));
+        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING))
+                
.isLessThanOrEqualTo(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED));
     }
 
     @Test
-    public void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws 
Exception {
+    void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(4);
         final JobVertexID onlyJobVertexId = getOnlyJobVertex(jobGraph).getID();
 
@@ -285,17 +262,17 @@ public class DefaultSchedulerTest extends TestLogger {
                         new ExecutionVertexID(onlyJobVertexId, 3));
         schedulingStrategy.schedule(verticesToSchedule);
 
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();
 
         
testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0));
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();
 
         testExecutionSlotAllocator.completePendingRequests();
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(4));
+        assertThat(testExecutionOperations.getDeployedVertices()).hasSize(4);
     }
 
     @Test
-    public void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws 
Exception {
+    void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws 
Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(10);
         final JobVertexID onlyJobVertexId = getOnlyJobVertex(jobGraph).getID();
 
@@ -321,11 +298,11 @@ public class DefaultSchedulerTest extends TestLogger {
         final List<ExecutionVertexID> deployedExecutionVertices =
                 testExecutionOperations.getDeployedVertices();
 
-        assertEquals(desiredScheduleOrder, deployedExecutionVertices);
+        assertThat(desiredScheduleOrder).isEqualTo(deployedExecutionVertices);
     }
 
     @Test
-    public void restartAfterDeploymentFails() {
+    void restartAfterDeploymentFails() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
 
@@ -340,11 +317,11 @@ public class DefaultSchedulerTest extends TestLogger {
                 testExecutionOperations.getDeployedVertices();
 
         final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
-        assertThat(deployedExecutionVertices, contains(executionVertexId, 
executionVertexId));
+        assertThat(deployedExecutionVertices).contains(executionVertexId, 
executionVertexId);
     }
 
     @Test
-    public void restartFailedTask() {
+    void restartFailedTask() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
 
@@ -366,22 +343,22 @@ public class DefaultSchedulerTest extends TestLogger {
         final List<ExecutionVertexID> deployedExecutionVertices =
                 testExecutionOperations.getDeployedVertices();
         final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
-        assertThat(deployedExecutionVertices, contains(executionVertexId, 
executionVertexId));
+        assertThat(deployedExecutionVertices).contains(executionVertexId, 
executionVertexId);
     }
 
     @Test
-    public void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
+    void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
 
         final TaskExecutionState taskExecutionState =
                 createFailedTaskExecutionState(createExecutionAttemptId());
 
-        assertFalse(scheduler.updateTaskExecutionState(taskExecutionState));
+        
assertThat(scheduler.updateTaskExecutionState(taskExecutionState)).isFalse();
     }
 
     @Test
-    public void failJobIfCannotRestart() throws Exception {
+    void failJobIfCannotRestart() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         testRestartBackoffTimeStrategy.setCanRestart(false);
 
@@ -402,11 +379,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         waitForTermination(scheduler);
         final JobStatus jobStatus = scheduler.requestJobStatus();
-        assertThat(jobStatus, is(equalTo(JobStatus.FAILED)));
+        assertThat(jobStatus).isEqualTo(JobStatus.FAILED);
     }
 
     @Test
-    public void failJobIfNotEnoughResources() throws Exception {
+    void failJobIfNotEnoughResources() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         testRestartBackoffTimeStrategy.setCanRestart(false);
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
@@ -417,7 +394,7 @@ public class DefaultSchedulerTest extends TestLogger {
 
         waitForTermination(scheduler);
         final JobStatus jobStatus = scheduler.requestJobStatus();
-        assertThat(jobStatus, is(equalTo(JobStatus.FAILED)));
+        assertThat(jobStatus).isEqualTo(JobStatus.FAILED);
 
         Throwable failureCause =
                 scheduler
@@ -426,24 +403,24 @@ public class DefaultSchedulerTest extends TestLogger {
                         .getFailureInfo()
                         .getException()
                         
.deserializeError(DefaultSchedulerTest.class.getClassLoader());
-        assertTrue(findThrowable(failureCause, 
NoResourceAvailableException.class).isPresent());
-        assertTrue(
-                findThrowableWithMessage(
+        assertThat(findThrowable(failureCause, 
NoResourceAvailableException.class)).isPresent();
+        assertThat(
+                        findThrowableWithMessage(
                                 failureCause,
-                                "Could not allocate the required slot within 
slot request timeout.")
-                        .isPresent());
-        assertThat(jobStatus, is(equalTo(JobStatus.FAILED)));
+                                "Could not allocate the required slot within 
slot request timeout."))
+                .isPresent();
+        assertThat(jobStatus).isEqualTo(JobStatus.FAILED);
     }
 
     @Test
-    public void restartVerticesOnSlotAllocationTimeout() throws Exception {
+    void restartVerticesOnSlotAllocationTimeout() throws Exception {
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
         testRestartVerticesOnFailuresInScheduling(
                 vid -> testExecutionSlotAllocator.timeoutPendingRequest(vid));
     }
 
     @Test
-    public void restartVerticesOnAssignedSlotReleased() throws Exception {
+    void restartVerticesOnAssignedSlotReleased() throws Exception {
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
         testRestartVerticesOnFailuresInScheduling(
                 vid -> {
@@ -481,7 +458,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1);
         schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22));
 
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(4));
+        assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(4);
 
         actionsToTriggerTaskFailure.accept(vid11);
 
@@ -498,20 +475,18 @@ public class DefaultSchedulerTest extends TestLogger {
 
         // ev11 and ev21 needs to be restarted because it is pipelined region 
failover and
         // they are in the same region. ev12 and ev22 will not be affected
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(2));
-        assertThat(ev11.getExecutionState(), is(ExecutionState.FAILED));
-        assertThat(ev21.getExecutionState(), is(ExecutionState.CANCELED));
-        assertThat(ev12.getExecutionState(), is(ExecutionState.SCHEDULED));
-        assertThat(ev22.getExecutionState(), is(ExecutionState.SCHEDULED));
+        assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
+        assertThat(ev11.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+        
assertThat(ev21.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
+        
assertThat(ev12.getExecutionState()).isEqualTo(ExecutionState.SCHEDULED);
+        
assertThat(ev22.getExecutionState()).isEqualTo(ExecutionState.SCHEDULED);
 
         taskRestartExecutor.triggerScheduledTasks();
-        assertThat(
-                schedulingStrategy.getReceivedVerticesToRestart(),
-                containsInAnyOrder(vid11, vid21));
+        
assertThat(schedulingStrategy.getReceivedVerticesToRestart()).contains(vid11, 
vid21);
     }
 
     @Test
-    public void skipDeploymentIfVertexVersionOutdated() {
+    void skipDeploymentIfVertexVersionOutdated() {
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
         final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
@@ -540,16 +515,14 @@ public class DefaultSchedulerTest extends TestLogger {
         testExecutionSlotAllocator.enableAutoCompletePendingRequests();
         taskRestartExecutor.triggerScheduledTasks();
 
-        assertThat(
-                testExecutionOperations.getDeployedVertices(),
-                containsInAnyOrder(sourceExecutionVertexId, 
sinkExecutionVertexId));
-        assertThat(
-                scheduler.requestJob().getArchivedExecutionGraph().getState(),
-                is(equalTo(JobStatus.RUNNING)));
+        assertThat(testExecutionOperations.getDeployedVertices())
+                .contains(sourceExecutionVertexId, sinkExecutionVertexId);
+        
assertThat(scheduler.requestJob().getArchivedExecutionGraph().getState())
+                .isEqualTo(JobStatus.RUNNING);
     }
 
     @Test
-    public void releaseSlotIfVertexVersionOutdated() {
+    void releaseSlotIfVertexVersionOutdated() {
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
@@ -561,11 +534,11 @@ public class DefaultSchedulerTest extends TestLogger {
         executionVertexVersioner.recordModification(onlyExecutionVertexId);
         testExecutionSlotAllocator.completePendingRequests();
 
-        assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(1));
+        assertThat(testExecutionSlotAllocator.getReturnedSlots()).hasSize(1);
     }
 
     @Test
-    public void vertexIsResetBeforeRestarted() throws Exception {
+    void vertexIsResetBeforeRestarted() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 
         final TestSchedulingStrategy.Factory schedulingStrategyFactory =
@@ -597,12 +570,12 @@ public class DefaultSchedulerTest extends TestLogger {
 
         taskRestartExecutor.triggerScheduledTasks();
 
-        assertThat(schedulingStrategy.getReceivedVerticesToRestart(), 
hasSize(1));
-        assertThat(onlySchedulingVertex.getState(), 
is(equalTo(ExecutionState.CREATED)));
+        
assertThat(schedulingStrategy.getReceivedVerticesToRestart()).hasSize(1);
+        
assertThat(onlySchedulingVertex.getState()).isEqualTo(ExecutionState.CREATED);
     }
 
     @Test
-    public void scheduleOnlyIfVertexIsCreated() throws Exception {
+    void scheduleOnlyIfVertexIsCreated() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 
         final TestSchedulingStrategy.Factory schedulingStrategyFactory =
@@ -625,16 +598,16 @@ public class DefaultSchedulerTest extends TestLogger {
         
schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId));
 
         // The scheduling of a non-CREATED vertex will result in 
IllegalStateException
-        try {
-            
schedulingStrategy.schedule(Collections.singletonList(onlySchedulingVertexId));
-            fail("IllegalStateException should happen");
-        } catch (IllegalStateException e) {
-            // expected exception
-        }
+        assertThatThrownBy(
+                        () ->
+                                schedulingStrategy.schedule(
+                                        
Collections.singletonList(onlySchedulingVertexId)),
+                        "IllegalStateException should happen")
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
-    public void handleGlobalFailure() {
+    void handleGlobalFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
 
@@ -658,7 +631,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final List<ExecutionVertexID> deployedExecutionVertices =
                 testExecutionOperations.getDeployedVertices();
         final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
-        assertThat(deployedExecutionVertices, contains(executionVertexId, 
executionVertexId));
+        assertThat(deployedExecutionVertices).contains(executionVertexId, 
executionVertexId);
     }
 
     /**
@@ -668,7 +641,7 @@ public class DefaultSchedulerTest extends TestLogger {
      * updates.
      */
     @Test
-    public void handleGlobalFailureWithLocalFailure() {
+    void handleGlobalFailureWithLocalFailure() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
         enableCheckpointing(jobGraph);
@@ -706,24 +679,26 @@ public class DefaultSchedulerTest extends TestLogger {
                 new ExecutionVertexID(onlyJobVertex.getID(), 0);
         final ExecutionVertexID executionVertexId1 =
                 new ExecutionVertexID(onlyJobVertex.getID(), 1);
-        assertThat(
-                "The execution vertices should be deployed in a specific order 
reflecting the scheduling start and the global fail-over afterwards.",
-                testExecutionOperations.getDeployedVertices(),
-                contains(
+        assertThat(testExecutionOperations.getDeployedVertices())
+                .withFailMessage(
+                        "The "
+                                + "execution vertices should be deployed in a 
specific order reflecting the "
+                                + "scheduling start and the global fail-over 
afterwards.")
+                .contains(
                         executionVertexId0,
                         executionVertexId1,
                         executionVertexId0,
-                        executionVertexId1));
+                        executionVertexId1);
     }
 
     @Test
-    public void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
+    void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
         assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(
                 SchedulerBase::startCheckpointScheduler);
     }
 
     @Test
-    public void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
+    void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
         assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(
                 SchedulerBase::stopCheckpointScheduler);
     }
@@ -734,7 +709,7 @@ public class DefaultSchedulerTest extends TestLogger {
         enableCheckpointing(jobGraph);
 
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
-        assertThat(scheduler.getCheckpointCoordinator(), is(notNullValue()));
+        assertThat(scheduler.getCheckpointCoordinator()).isNotNull();
         scheduler.updateTaskExecutionState(
                 new TaskExecutionState(
                         Iterables.getOnlyElement(
@@ -743,13 +718,13 @@ public class DefaultSchedulerTest extends TestLogger {
                                 .getAttemptId(),
                         ExecutionState.FINISHED));
 
-        assertThat(scheduler.getCheckpointCoordinator(), is(nullValue()));
+        assertThat(scheduler.getCheckpointCoordinator()).isNull();
         callSchedulingOperation.accept(scheduler);
-        assertThat(scheduler.getCheckpointCoordinator(), is(nullValue()));
+        assertThat(scheduler.getCheckpointCoordinator()).isNull();
     }
 
     @Test
-    public void vertexIsNotAffectedByOutdatedDeployment() {
+    void vertexIsNotAffectedByOutdatedDeployment() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
 
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
@@ -779,11 +754,11 @@ public class DefaultSchedulerTest extends TestLogger {
                 
createFailedTaskExecutionState(v2.getCurrentExecutionAttempt().getAttemptId()));
 
         // v1 should not be affected
-        assertThat(sv1.getState(), is(equalTo(ExecutionState.SCHEDULED)));
+        assertThat(sv1.getState()).isEqualTo(ExecutionState.SCHEDULED);
     }
 
     @Test
-    public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
+    void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         enableCheckpointing(jobGraph);
 
@@ -809,15 +784,15 @@ public class DefaultSchedulerTest extends TestLogger {
 
         checkpointCoordinator.triggerCheckpoint(false);
         checkpointTriggeredLatch.await();
-        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(1)));
+        
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
-        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
+        
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
     }
 
     @Test
-    public void restoreStateWhenRestartingTasks() throws Exception {
+    void restoreStateWhenRestartingTasks() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         enableCheckpointing(jobGraph);
 
@@ -854,11 +829,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
-        assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
+        assertThat(masterHook.getRestoreCount()).isOne();
     }
 
     @Test
-    public void failGlobalWhenRestoringStateFails() throws Exception {
+    void failGlobalWhenRestoringStateFails() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
         enableCheckpointing(jobGraph);
@@ -902,17 +877,17 @@ public class DefaultSchedulerTest extends TestLogger {
         List<ExecutionVertexID> deployedExecutionVertices =
                 testExecutionOperations.getDeployedVertices();
         final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
-        assertThat(deployedExecutionVertices, contains(executionVertexId));
+        assertThat(deployedExecutionVertices).contains(executionVertexId);
 
         // a global failure should be triggered on state restore failure
         masterHook.disableFailOnRestore();
         taskRestartExecutor.triggerScheduledTasks();
         deployedExecutionVertices = 
testExecutionOperations.getDeployedVertices();
-        assertThat(deployedExecutionVertices, contains(executionVertexId, 
executionVertexId));
+        assertThat(deployedExecutionVertices).contains(executionVertexId, 
executionVertexId);
     }
 
     @Test
-    public void failJobWillIncrementVertexVersions() {
+    void failJobWillIncrementVertexVersions() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
         final ExecutionVertexID onlyExecutionVertexId =
@@ -924,11 +899,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         scheduler.failJob(new FlinkException("Test failure."), 
System.currentTimeMillis());
 
-        
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
+        
assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
     }
 
     @Test
-    public void cancelJobWillIncrementVertexVersions() {
+    void cancelJobWillIncrementVertexVersions() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
         final ExecutionVertexID onlyExecutionVertexId =
@@ -940,11 +915,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         scheduler.cancel();
 
-        
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
+        
assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
     }
 
     @Test
-    public void suspendJobWillIncrementVertexVersions() throws Exception {
+    void suspendJobWillIncrementVertexVersions() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
         final ExecutionVertexID onlyExecutionVertexId =
@@ -956,11 +931,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         scheduler.close();
 
-        
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
+        
assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
     }
 
     @Test
-    public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
+    void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
 
@@ -988,13 +963,13 @@ public class DefaultSchedulerTest extends TestLogger {
         taskRestartExecutor.triggerNonPeriodicScheduledTask();
         final JobStatus jobStatusAfterRestarts = scheduler.requestJobStatus();
 
-        assertThat(jobStatusAfterFirstFailure, equalTo(JobStatus.RESTARTING));
-        assertThat(jobStatusWithPendingRestarts, 
equalTo(JobStatus.RESTARTING));
-        assertThat(jobStatusAfterRestarts, equalTo(JobStatus.RUNNING));
+        assertThat(jobStatusAfterFirstFailure).isEqualTo(JobStatus.RESTARTING);
+        
assertThat(jobStatusWithPendingRestarts).isEqualTo(JobStatus.RESTARTING);
+        assertThat(jobStatusAfterRestarts).isEqualTo(JobStatus.RUNNING);
     }
 
     @Test
-    public void cancelWhileRestartingShouldWaitForRunningTasks() {
+    void cancelWhileRestartingShouldWaitForRunningTasks() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
         final SchedulingTopology topology = scheduler.getSchedulingTopology();
@@ -1022,13 +997,13 @@ public class DefaultSchedulerTest extends TestLogger {
                 new TaskExecutionState(
                         attemptId2, ExecutionState.CANCELED, new 
RuntimeException("expected")));
 
-        assertThat(vertex2StateAfterCancel, 
is(equalTo(ExecutionState.CANCELING)));
-        assertThat(statusAfterCancelWhileRestarting, 
is(equalTo(JobStatus.CANCELLING)));
-        assertThat(scheduler.requestJobStatus(), 
is(equalTo(JobStatus.CANCELED)));
+        
assertThat(vertex2StateAfterCancel).isEqualTo(ExecutionState.CANCELING);
+        
assertThat(statusAfterCancelWhileRestarting).isEqualTo(JobStatus.CANCELLING);
+        assertThat(scheduler.requestJobStatus()).isEqualTo(JobStatus.CANCELED);
     }
 
     @Test
-    public void failureInfoIsSetAfterTaskFailure() {
+    void failureInfoIsSetAfterTaskFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
 
@@ -1048,12 +1023,12 @@ public class DefaultSchedulerTest extends TestLogger {
 
         final ErrorInfo failureInfo =
                 
scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
-        assertThat(failureInfo, is(notNullValue()));
-        assertThat(failureInfo.getExceptionAsString(), 
containsString(exceptionMessage));
+        assertThat(failureInfo).isNotNull();
+        
assertThat(failureInfo.getExceptionAsString()).contains(exceptionMessage);
     }
 
     @Test
-    public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws 
Exception {
+    void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
@@ -1073,7 +1048,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         .iterator();
         ArchivedExecutionVertex v1 = vertexIterator.next();
 
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(2));
+        assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
 
         final String exceptionMessage = "expected exception";
         scheduler.updateTaskExecutionState(
@@ -1090,13 +1065,13 @@ public class DefaultSchedulerTest extends TestLogger {
                         .iterator();
         v1 = vertexIterator.next();
         ArchivedExecutionVertex v2 = vertexIterator.next();
-        assertThat(v1.getExecutionState(), is(ExecutionState.FAILED));
-        assertThat(v2.getExecutionState(), is(ExecutionState.CANCELED));
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
+        assertThat(v1.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+        assertThat(v2.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
+        assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty();
     }
 
     @Test
-    public void 
pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots()
+    void 
pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots()
             throws Exception {
         final int parallelism = 10;
         final JobGraph jobGraph = sourceSinkJobGraph(parallelism);
@@ -1118,12 +1093,11 @@ public class DefaultSchedulerTest extends TestLogger {
                 
testExecutionSlotAllocator.getPendingRequests().values().stream()
                         .map(ExecutionSlotAssignment::getLogicalSlotFuture)
                         .collect(Collectors.toSet());
-        assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2));
+        assertThat(pendingLogicalSlotFutures).hasSize(parallelism * 2);
 
         testExecutionSlotAllocator.completePendingRequest(ev1.getID());
-        assertThat(
-                
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count(),
-                is(1L));
+        
assertThat(pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count())
+                .isEqualTo(1L);
 
         final String exceptionMessage = "expected exception";
         scheduler.updateTaskExecutionState(
@@ -1132,21 +1106,23 @@ public class DefaultSchedulerTest extends TestLogger {
                         ExecutionState.FAILED,
                         new RuntimeException(exceptionMessage)));
 
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
+        assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty();
 
         // the failed task will return its slot before triggering failover. 
And the slot
         // will be returned and re-assigned to another task which is waiting 
for a slot.
         // failover will be triggered after that and the re-assigned slot will 
be returned
         // once the attached task is canceled, but the slot will not be 
assigned to other
         // tasks which are identified to be restarted soon.
-        assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(2));
+        assertThat(testExecutionSlotAllocator.getReturnedSlots()).hasSize(2);
         assertThat(
-                
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count(),
-                is(parallelism * 2L - 2L));
+                        pendingLogicalSlotFutures.stream()
+                                .filter(CompletableFuture::isCancelled)
+                                .count())
+                .isEqualTo(parallelism * 2L - 2L);
     }
 
     @Test
-    public void testExceptionHistoryWithGlobalFailOver() {
+    void testExceptionHistoryWithGlobalFailOver() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
 
@@ -1171,19 +1147,21 @@ public class DefaultSchedulerTest extends TestLogger {
         final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
                 scheduler.getExceptionHistory();
 
-        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+        assertThat(actualExceptionHistory).hasSize(1);
 
         final RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
+
         assertThat(
-                failure,
-                ExceptionHistoryEntryMatcher.matchesGlobalFailure(
-                        expectedException,
-                        
scheduler.getExecutionGraph().getFailureInfo().getTimestamp()));
-        assertThat(failure.getConcurrentExceptions(), 
IsEmptyIterable.emptyIterable());
+                        ExceptionHistoryEntryTestingUtils.matchesGlobalFailure(
+                                failure,
+                                expectedException,
+                                
scheduler.getExecutionGraph().getFailureInfo().getTimestamp()))
+                .isTrue();
+        assertThat(failure.getConcurrentExceptions()).isEmpty();
     }
 
     @Test
-    public void testExceptionHistoryWithRestartableFailure() {
+    void testExceptionHistoryWithRestartableFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 
         final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
@@ -1229,20 +1207,26 @@ public class DefaultSchedulerTest extends TestLogger {
                 scheduler.getExceptionHistory();
 
         // assert restarted attempt
+        assertThat(actualExceptionHistory).hasSize(2);
+        Iterator<RootExceptionHistoryEntry> iterator = 
actualExceptionHistory.iterator();
+        RootExceptionHistoryEntry entry0 = iterator.next();
         assertThat(
-                actualExceptionHistory,
-                IsIterableContainingInOrder.contains(
-                        ExceptionHistoryEntryMatcher.matchesFailure(
+                        ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                entry0,
                                 restartableException,
                                 updateStateTriggeringRestartTimestamp,
                                 
taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(),
-                                
taskFailureExecutionVertex.getCurrentAssignedResourceLocation()),
-                        ExceptionHistoryEntryMatcher.matchesGlobalFailure(
-                                failingException, 
updateStateTriggeringJobFailureTimestamp)));
+                                
taskFailureExecutionVertex.getCurrentAssignedResourceLocation()))
+                .isTrue();
+        RootExceptionHistoryEntry entry1 = iterator.next();
+        assertThat(
+                        ExceptionHistoryEntryTestingUtils.matchesGlobalFailure(
+                                entry1, failingException, 
updateStateTriggeringJobFailureTimestamp))
+                .isTrue();
     }
 
     @Test
-    public void testExceptionHistoryWithPreDeployFailure() {
+    void testExceptionHistoryWithPreDeployFailure() {
         // disable auto-completing slot requests to simulate timeout
         executionSlotAllocatorFactory
                 .getTestExecutionSlotAllocator()
@@ -1263,8 +1247,7 @@ public class DefaultSchedulerTest extends TestLogger {
         taskRestartExecutor.triggerNonPeriodicScheduledTask();
 
         // sanity check that the TaskManagerLocation of the failed task is 
indeed null, as expected
-        assertThat(
-                
taskFailureExecutionVertex.getCurrentAssignedResourceLocation(), 
is(nullValue()));
+        
assertThat(taskFailureExecutionVertex.getCurrentAssignedResourceLocation()).isNull();
 
         final ErrorInfo failureInfo =
                 taskFailureExecutionVertex
@@ -1273,18 +1256,20 @@ public class DefaultSchedulerTest extends TestLogger {
 
         final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
                 scheduler.getExceptionHistory();
-        assertThat(
-                actualExceptionHistory,
-                IsIterableContainingInOrder.contains(
-                        ExceptionHistoryEntryMatcher.matchesFailure(
-                                failureInfo.getException(),
-                                failureInfo.getTimestamp(),
-                                
taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(),
-                                
taskFailureExecutionVertex.getCurrentAssignedResourceLocation())));
+        assertThat(actualExceptionHistory)
+                .anySatisfy(
+                        e ->
+                                
ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                        e,
+                                        failureInfo.getException(),
+                                        failureInfo.getTimestamp(),
+                                        
taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(),
+                                        taskFailureExecutionVertex
+                                                
.getCurrentAssignedResourceLocation()));
     }
 
     @Test
-    public void testExceptionHistoryConcurrentRestart() throws Exception {
+    void testExceptionHistoryConcurrentRestart() throws Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
 
         final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
@@ -1336,42 +1321,43 @@ public class DefaultSchedulerTest extends TestLogger {
 
         delayExecutor.triggerNonPeriodicScheduledTasks();
 
-        assertThat(scheduler.getExceptionHistory(), 
IsIterableWithSize.iterableWithSize(2));
+        assertThat(scheduler.getExceptionHistory()).hasSize(2);
         final Iterator<RootExceptionHistoryEntry> actualExceptionHistory =
                 scheduler.getExceptionHistory().iterator();
 
         final RootExceptionHistoryEntry entry0 = actualExceptionHistory.next();
         assertThat(
-                entry0,
-                is(
-                        ExceptionHistoryEntryMatcher.matchesFailure(
+                        ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                entry0,
                                 exception0,
                                 updateStateTriggeringRestartTimestamp0,
                                 executionVertex0.getTaskNameWithSubtaskIndex(),
-                                
executionVertex0.getCurrentAssignedResourceLocation())));
-        assertThat(
-                entry0.getConcurrentExceptions(),
-                IsIterableContainingInOrder.contains(
-                        ExceptionHistoryEntryMatcher.matchesFailure(
-                                exception1,
-                                updateStateTriggeringRestartTimestamp1,
-                                executionVertex1.getTaskNameWithSubtaskIndex(),
-                                
executionVertex1.getCurrentAssignedResourceLocation())));
+                                
executionVertex0.getCurrentAssignedResourceLocation()))
+                .isTrue();
+        assertThat(entry0.getConcurrentExceptions())
+                .anySatisfy(
+                        e ->
+                                
ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                        e,
+                                        exception1,
+                                        updateStateTriggeringRestartTimestamp1,
+                                        
executionVertex1.getTaskNameWithSubtaskIndex(),
+                                        
executionVertex1.getCurrentAssignedResourceLocation()));
 
         final RootExceptionHistoryEntry entry1 = actualExceptionHistory.next();
         assertThat(
-                entry1,
-                is(
-                        ExceptionHistoryEntryMatcher.matchesFailure(
+                        ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                entry1,
                                 exception1,
                                 updateStateTriggeringRestartTimestamp1,
                                 executionVertex1.getTaskNameWithSubtaskIndex(),
-                                
executionVertex1.getCurrentAssignedResourceLocation())));
-        assertThat(entry1.getConcurrentExceptions(), 
IsEmptyIterable.emptyIterable());
+                                
executionVertex1.getCurrentAssignedResourceLocation()))
+                .isTrue();
+        assertThat(entry1.getConcurrentExceptions()).isEmpty();
     }
 
     @Test
-    public void testExceptionHistoryTruncation() {
+    void testExceptionHistoryTruncation() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 
         configuration.set(WebOptions.MAX_EXCEPTION_HISTORY_SIZE, 1);
@@ -1403,18 +1389,19 @@ public class DefaultSchedulerTest extends TestLogger {
 
         taskRestartExecutor.triggerNonPeriodicScheduledTasks();
 
-        assertThat(
-                scheduler.getExceptionHistory(),
-                IsIterableContainingInOrder.contains(
-                        ExceptionHistoryEntryMatcher.matchesFailure(
-                                exception,
-                                relevantTimestamp,
-                                executionVertex1.getTaskNameWithSubtaskIndex(),
-                                
executionVertex1.getCurrentAssignedResourceLocation())));
+        assertThat(scheduler.getExceptionHistory())
+                .anySatisfy(
+                        e ->
+                                
ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                        e,
+                                        exception,
+                                        relevantTimestamp,
+                                        
executionVertex1.getTaskNameWithSubtaskIndex(),
+                                        
executionVertex1.getCurrentAssignedResourceLocation()));
     }
 
     @Test
-    public void testStatusMetrics() throws Exception {
+    void testStatusMetrics() throws Exception {
         // running time acts as a stand-in for generic status time metrics
         final CompletableFuture<Gauge<Long>> runningTimeMetricFuture = new 
CompletableFuture<>();
         final MetricRegistry metricRegistry =
@@ -1492,11 +1479,11 @@ public class DefaultSchedulerTest extends TestLogger {
         Thread.sleep(10L);
 
         final Gauge<Long> runningTimeGauge = runningTimeMetricFuture.get();
-        Assert.assertThat(runningTimeGauge.getValue(), greaterThan(0L));
+        assertThat(runningTimeGauge.getValue()).isGreaterThan(0L);
     }
 
     @Test
-    public void testDeploymentWaitForProducedPartitionRegistration() {
+    void testDeploymentWaitForProducedPartitionRegistration() {
         shuffleMaster.setAutoCompleteRegistration(false);
 
         final List<ResultPartitionID> trackedPartitions = new ArrayList<>();
@@ -1511,44 +1498,44 @@ public class DefaultSchedulerTest extends TestLogger {
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(trackedPartitions, hasSize(0));
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(trackedPartitions).isEmpty();
+        assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();
 
         shuffleMaster.completeAllPendingRegistrations();
-        assertThat(trackedPartitions, hasSize(1));
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(2));
+        assertThat(trackedPartitions).hasSize(1);
+        assertThat(testExecutionOperations.getDeployedVertices()).hasSize(2);
     }
 
     @Test
-    public void testFailedProducedPartitionRegistration() {
+    void testFailedProducedPartitionRegistration() {
         shuffleMaster.setAutoCompleteRegistration(false);
 
         final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(testExecutionOperations.getCanceledVertices(), hasSize(0));
-        assertThat(testExecutionOperations.getFailedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getCanceledVertices()).isEmpty();
+        assertThat(testExecutionOperations.getFailedVertices()).isEmpty();
 
         shuffleMaster.failAllPendingRegistrations();
-        assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2));
-        assertThat(testExecutionOperations.getFailedVertices(), hasSize(1));
+        assertThat(testExecutionOperations.getCanceledVertices()).hasSize(2);
+        assertThat(testExecutionOperations.getFailedVertices()).hasSize(1);
     }
 
     @Test
-    public void testDirectExceptionOnProducedPartitionRegistration() {
+    void testDirectExceptionOnProducedPartitionRegistration() {
         shuffleMaster.setThrowExceptionalOnRegistration(true);
 
         final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2));
-        assertThat(testExecutionOperations.getFailedVertices(), hasSize(1));
+        assertThat(testExecutionOperations.getCanceledVertices()).hasSize(2);
+        assertThat(testExecutionOperations.getFailedVertices()).hasSize(1);
     }
 
     @Test
-    public void testProducedPartitionRegistrationTimeout() throws Exception {
+    void testProducedPartitionRegistrationTimeout() throws Exception {
         ScheduledExecutorService scheduledExecutorService = null;
         try {
             scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
@@ -1573,7 +1560,7 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testLateRegisteredPartitionsWillBeReleased() {
+    void testLateRegisteredPartitionsWillBeReleased() {
         shuffleMaster.setAutoCompleteRegistration(false);
 
         final List<ResultPartitionID> trackedPartitions = new ArrayList<>();
@@ -1601,12 +1588,12 @@ public class DefaultSchedulerTest extends TestLogger {
 
         // late registered partitions will not be tracked and will be released
         shuffleMaster.completeAllPendingRegistrations();
-        assertThat(trackedPartitions, hasSize(0));
-        assertThat(shuffleMaster.getExternallyReleasedPartitions(), 
hasSize(1));
+        assertThat(trackedPartitions).isEmpty();
+        assertThat(shuffleMaster.getExternallyReleasedPartitions()).hasSize(1);
     }
 
     @Test
-    public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws 
Exception {
+    void testCheckpointCleanerIsClosedAfterCheckpointServices() throws 
Exception {
         final ScheduledExecutorService executorService =
                 Executors.newSingleThreadScheduledExecutor();
         try {
@@ -1635,17 +1622,17 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testJobStatusHookWithJobFailed() throws Exception {
+    void testJobStatusHookWithJobFailed() throws Exception {
         commonJobStatusHookTest(ExecutionState.FAILED, JobStatus.FAILED);
     }
 
     @Test
-    public void testJobStatusHookWithJobCanceled() throws Exception {
+    void testJobStatusHookWithJobCanceled() throws Exception {
         commonJobStatusHookTest(ExecutionState.CANCELED, JobStatus.CANCELED);
     }
 
     @Test
-    public void testJobStatusHookWithJobFinished() throws Exception {
+    void testJobStatusHookWithJobFinished() throws Exception {
         commonJobStatusHookTest(ExecutionState.FINISHED, JobStatus.FINISHED);
     }
 
@@ -1701,14 +1688,10 @@ public class DefaultSchedulerTest extends TestLogger {
 
         waitForTermination(scheduler);
         final JobStatus jobStatus = scheduler.requestJobStatus();
-        
org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus);
-        
org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1);
-        org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0))
-                .isEqualTo(jobGraph.getJobID());
+        assertThat(jobStatus).isEqualTo(expectedJobStatus);
+        
assertThat(onCreatedJobList).singleElement().isEqualTo(jobGraph.getJobID());
 
-        org.assertj.core.api.Assertions.assertThat(onJobStatusList).hasSize(1);
-        org.assertj.core.api.Assertions.assertThat(onJobStatusList.get(0))
-                .isEqualTo(jobGraph.getJobID());
+        
assertThat(onCreatedJobList).singleElement().isEqualTo(jobGraph.getJobID());
     }
 
     /**
@@ -1775,9 +1758,9 @@ public class DefaultSchedulerTest extends TestLogger {
 
         // Wait for scheduler to start closing.
         schedulerClosing.await();
-        assertFalse(
-                "CheckpointCleaner should not close before checkpoint 
services.",
-                cleanerClosed.await(10, TimeUnit.MILLISECONDS));
+        assertThat(cleanerClosed.await(10, TimeUnit.MILLISECONDS))
+                .withFailMessage("CheckpointCleaner should not close before 
checkpoint services.")
+                .isFalse();
         checkpointServicesShutdownBlocked.countDown();
         cleanerClosed.await();
         schedulerClosed.get();
@@ -1992,7 +1975,7 @@ public class DefaultSchedulerTest extends TestLogger {
         }
 
         /** Actually schedules the collected {@link ScheduledTask 
ScheduledTasks}. */
-        public void scheduleCollectedScheduledTasks() {
+        void scheduleCollectedScheduledTasks() {
             for (ScheduledTask<?> scheduledTask : scheduledTasks) {
                 super.schedule(
                         scheduledTask.getCallable(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
new file mode 100644
index 00000000000..08fad7e96b9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.exceptionhistory;
+
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Objects;
+
+import static 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation;
+
+/** A utility class to matches {@link ExceptionHistoryEntry} instances for 
testing. */
+public class ExceptionHistoryEntryTestingUtils {
+
+    public static boolean matchesGlobalFailure(
+            ExceptionHistoryEntry exceptionHistoryEntry,
+            Throwable expectedException,
+            long expectedTimestamp) {
+        return matchesInternal(
+                exceptionHistoryEntry, expectedException, expectedTimestamp, 
null, null);
+    }
+
+    public static boolean matchesFailure(
+            ExceptionHistoryEntry exceptionHistoryEntry,
+            Throwable expectedException,
+            long expectedTimestamp,
+            String expectedTaskName,
+            TaskManagerLocation expectedTaskManagerLocation) {
+        return matchesInternal(
+                exceptionHistoryEntry,
+                expectedException,
+                expectedTimestamp,
+                expectedTaskName,
+                expectedTaskManagerLocation);
+    }
+
+    private static boolean matchesInternal(
+            ExceptionHistoryEntry exceptionHistoryEntry,
+            Throwable expectedException,
+            long expectedTimestamp,
+            String expectedTaskName,
+            TaskManagerLocation expectedTaskManagerLocation) {
+        boolean match =
+                exceptionHistoryEntry
+                                .getException()
+                                
.deserializeError(ClassLoader.getSystemClassLoader())
+                                .equals(expectedException)
+                        && exceptionHistoryEntry.getTimestamp() == 
expectedTimestamp
+                        && !Objects.equals(
+                                exceptionHistoryEntry.getFailingTaskName(), 
expectedTaskName);
+
+        match |=
+                matchesTaskManagerLocation(
+                        exceptionHistoryEntry.getTaskManagerLocation(),
+                        fromTaskManagerLocation(expectedTaskManagerLocation));
+
+        return match;
+    }
+
+    private static boolean matchesTaskManagerLocation(
+            ExceptionHistoryEntry.ArchivedTaskManagerLocation actual,
+            ExceptionHistoryEntry.ArchivedTaskManagerLocation 
expectedLocation) {
+        if (actual == null) {
+            return expectedLocation == null;
+        } else if (expectedLocation == null) {
+            return false;
+        }
+
+        return Objects.equals(actual.getAddress(), 
expectedLocation.getAddress())
+                && Objects.equals(actual.getFQDNHostname(), 
expectedLocation.getFQDNHostname())
+                && Objects.equals(actual.getHostname(), 
expectedLocation.getHostname())
+                && Objects.equals(actual.getResourceID(), 
expectedLocation.getResourceID())
+                && Objects.equals(actual.getPort(), 
expectedLocation.getPort());
+    }
+}

Reply via email to