This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35498b29836b0704ef092637d9728ce27f42d277
Author: Lijie Wang <[email protected]>
AuthorDate: Wed Oct 19 17:19:47 2022 +0800

    [hotfix][runtime] Migrate some related tests to Juint5
---
 .../DefaultExecutionGraphDeploymentTest.java       | 302 +++++++++------------
 ...tExecutionGraphDeploymentWithBlobCacheTest.java |  20 +-
 ...ExecutionGraphDeploymentWithBlobServerTest.java |  61 +++--
 ...hDeploymentWithSmallBlobCacheSizeLimitTest.java |  33 +--
 .../partition/SortMergeResultPartitionTest.java    | 186 ++++++-------
 .../metrics/groups/TaskIOMetricGroupTest.java      |  68 +++--
 .../DefaultVertexParallelismDeciderTest.java       |  47 ++--
 7 files changed, 334 insertions(+), 383 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index 05237422a71..32e6302ec88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -66,14 +66,11 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.function.FunctionUtils;
 
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -87,19 +84,14 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link DefaultExecutionGraph} deployment. */
-public class DefaultExecutionGraphDeploymentTest extends TestLogger {
+class DefaultExecutionGraphDeploymentTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
+            TestingUtils.defaultExecutorExtension();
 
     /** BLOB server instance to use for the job graph. */
     protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
@@ -117,7 +109,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
      * @param eg the execution graph that was created
      */
     protected void checkJobOffloaded(DefaultExecutionGraph eg) throws 
Exception {
-        assertTrue(eg.getJobInformationOrBlobKey().isLeft());
+        assertThat(eg.getJobInformationOrBlobKey().isLeft()).isTrue();
     }
 
     /**
@@ -128,11 +120,11 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
      * @param jobVertexId job vertex ID
      */
     protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID 
jobVertexId) throws Exception {
-        
assertTrue(eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft());
+        
assertThat(eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft()).isTrue();
     }
 
     @Test
-    public void testBuildDeploymentDescriptor() throws Exception {
+    void testBuildDeploymentDescriptor() throws Exception {
 
         final JobVertexID jid1 = new JobVertexID();
         final JobVertexID jid2 = new JobVertexID();
@@ -194,7 +186,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
                         .setTaskManagerGateway(taskManagerGateway)
                         .createTestingLogicalSlot();
 
-        assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+        
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
         
vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
 
         vertex.getCurrentExecutionAttempt()
@@ -202,128 +194,111 @@ public class DefaultExecutionGraphDeploymentTest 
extends TestLogger {
                 .get();
         vertex.deployToSlot(slot);
 
-        assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+        
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);
         checkTaskOffloaded(eg, vertex.getJobvertexId());
 
         TaskDeploymentDescriptor descr = tdd.get();
-        assertNotNull(descr);
+        assertThat(descr).isNotNull();
 
         JobInformation jobInformation =
                 
descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
         TaskInformation taskInformation =
                 
descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
 
-        assertEquals(jobId, descr.getJobId());
-        assertEquals(jobId, jobInformation.getJobId());
-        assertEquals(jid2, taskInformation.getJobVertexId());
-        assertEquals(3, descr.getSubtaskIndex());
-        assertEquals(10, taskInformation.getNumberOfSubtasks());
-        assertEquals(BatchTask.class.getName(), 
taskInformation.getInvokableClassName());
-        assertEquals("v2", taskInformation.getTaskName());
+        assertThat(descr.getJobId()).isEqualTo(jobId);
+        assertThat(jobInformation.getJobId()).isEqualTo(jobId);
+        assertThat(taskInformation.getJobVertexId()).isEqualTo(jid2);
+        assertThat(descr.getSubtaskIndex()).isEqualTo(3);
+        assertThat(taskInformation.getNumberOfSubtasks()).isEqualTo(10);
+        
assertThat(taskInformation.getInvokableClassName()).isEqualTo(BatchTask.class.getName());
+        assertThat(taskInformation.getTaskName()).isEqualTo("v2");
 
         Collection<ResultPartitionDeploymentDescriptor> producedPartitions =
                 descr.getProducedPartitions();
         Collection<InputGateDeploymentDescriptor> consumedPartitions = 
descr.getInputGates();
 
-        assertEquals(2, producedPartitions.size());
-        assertEquals(1, consumedPartitions.size());
+        assertThat(producedPartitions.size()).isEqualTo(2);
+        assertThat(consumedPartitions.size()).isEqualTo(1);
 
         Iterator<ResultPartitionDeploymentDescriptor> 
iteratorProducedPartitions =
                 producedPartitions.iterator();
         Iterator<InputGateDeploymentDescriptor> iteratorConsumedPartitions =
                 consumedPartitions.iterator();
 
-        assertEquals(10, 
iteratorProducedPartitions.next().getNumberOfSubpartitions());
-        assertEquals(10, 
iteratorProducedPartitions.next().getNumberOfSubpartitions());
+        
assertThat(iteratorProducedPartitions.next().getNumberOfSubpartitions()).isEqualTo(10);
+        
assertThat(iteratorProducedPartitions.next().getNumberOfSubpartitions()).isEqualTo(10);
 
         ShuffleDescriptor[] shuffleDescriptors =
                 iteratorConsumedPartitions.next().getShuffleDescriptors();
-        assertEquals(10, shuffleDescriptors.length);
+        assertThat(shuffleDescriptors.length).isEqualTo(10);
 
         Iterator<ConsumedPartitionGroup> iteratorConsumedPartitionGroup =
                 vertex.getAllConsumedPartitionGroups().iterator();
         int idx = 0;
         for (IntermediateResultPartitionID partitionId : 
iteratorConsumedPartitionGroup.next()) {
-            assertEquals(
-                    partitionId, 
shuffleDescriptors[idx++].getResultPartitionID().getPartitionId());
+            
assertThat(shuffleDescriptors[idx++].getResultPartitionID().getPartitionId())
+                    .isEqualTo(partitionId);
         }
     }
 
     @Test
-    public void testRegistrationOfExecutionsFinishing() {
-        try {
-            final JobVertexID jid1 = new JobVertexID();
-            final JobVertexID jid2 = new JobVertexID();
+    void testRegistrationOfExecutionsFinishing() throws Exception {
 
-            JobVertex v1 = new JobVertex("v1", jid1);
-            JobVertex v2 = new JobVertex("v2", jid2);
+        final JobVertexID jid1 = new JobVertexID();
+        final JobVertexID jid2 = new JobVertexID();
 
-            SchedulerBase scheduler = setupScheduler(v1, 7650, v2, 2350);
-            Collection<Execution> executions =
-                    new ArrayList<>(
-                            
scheduler.getExecutionGraph().getRegisteredExecutions().values());
+        JobVertex v1 = new JobVertex("v1", jid1);
+        JobVertex v2 = new JobVertex("v2", jid2);
 
-            for (Execution e : executions) {
-                e.markFinished();
-            }
+        SchedulerBase scheduler = setupScheduler(v1, 7650, v2, 2350);
+        Collection<Execution> executions =
+                new 
ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values());
 
-            assertEquals(0, 
scheduler.getExecutionGraph().getRegisteredExecutions().size());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        for (Execution e : executions) {
+            e.markFinished();
         }
+
+        
assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
     }
 
     @Test
-    public void testRegistrationOfExecutionsFailing() {
-        try {
-
-            final JobVertexID jid1 = new JobVertexID();
-            final JobVertexID jid2 = new JobVertexID();
+    void testRegistrationOfExecutionsFailing() throws Exception {
 
-            JobVertex v1 = new JobVertex("v1", jid1);
-            JobVertex v2 = new JobVertex("v2", jid2);
+        final JobVertexID jid1 = new JobVertexID();
+        final JobVertexID jid2 = new JobVertexID();
 
-            SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6);
-            Collection<Execution> executions =
-                    new ArrayList<>(
-                            
scheduler.getExecutionGraph().getRegisteredExecutions().values());
+        JobVertex v1 = new JobVertex("v1", jid1);
+        JobVertex v2 = new JobVertex("v2", jid2);
 
-            for (Execution e : executions) {
-                e.markFailed(null);
-            }
+        SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6);
+        Collection<Execution> executions =
+                new 
ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values());
 
-            assertEquals(0, 
scheduler.getExecutionGraph().getRegisteredExecutions().size());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        for (Execution e : executions) {
+            e.markFailed(null);
         }
+
+        
assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
     }
 
     @Test
-    public void testRegistrationOfExecutionsFailedExternally() {
-        try {
+    void testRegistrationOfExecutionsFailedExternally() throws Exception {
 
-            final JobVertexID jid1 = new JobVertexID();
-            final JobVertexID jid2 = new JobVertexID();
-
-            JobVertex v1 = new JobVertex("v1", jid1);
-            JobVertex v2 = new JobVertex("v2", jid2);
+        final JobVertexID jid1 = new JobVertexID();
+        final JobVertexID jid2 = new JobVertexID();
 
-            SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6);
-            Collection<Execution> executions =
-                    new ArrayList<>(
-                            
scheduler.getExecutionGraph().getRegisteredExecutions().values());
+        JobVertex v1 = new JobVertex("v1", jid1);
+        JobVertex v2 = new JobVertex("v2", jid2);
 
-            for (Execution e : executions) {
-                e.fail(null);
-            }
+        SchedulerBase scheduler = setupScheduler(v1, 7, v2, 6);
+        Collection<Execution> executions =
+                new 
ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values());
 
-            assertEquals(0, 
scheduler.getExecutionGraph().getRegisteredExecutions().size());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        for (Execution e : executions) {
+            e.fail(null);
         }
+
+        
assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
     }
 
     /**
@@ -331,7 +306,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
      * accumulators and metrics for an execution that failed or was canceled.
      */
     @Test
-    public void testAccumulatorsAndMetricsForwarding() throws Exception {
+    void testAccumulatorsAndMetricsForwarding() throws Exception {
         final JobVertexID jid1 = new JobVertexID();
         final JobVertexID jid2 = new JobVertexID();
 
@@ -361,9 +336,9 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
 
         scheduler.updateTaskExecutionState(state);
 
-        assertEquals(ioMetrics, execution1.getIOMetrics());
-        assertNotNull(execution1.getUserAccumulators());
-        assertEquals(4, 
execution1.getUserAccumulators().get("acc").getLocalValue());
+        assertThat(execution1.getIOMetrics()).isEqualTo(ioMetrics);
+        assertThat(execution1.getUserAccumulators()).isNotNull();
+        
assertThat(execution1.getUserAccumulators().get("acc").getLocalValue()).isEqualTo(4);
 
         // verify behavior for failed executions
         Execution execution2 = executions.values().iterator().next();
@@ -384,9 +359,9 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
 
         scheduler.updateTaskExecutionState(state2);
 
-        assertEquals(ioMetrics2, execution2.getIOMetrics());
-        assertNotNull(execution2.getUserAccumulators());
-        assertEquals(8, 
execution2.getUserAccumulators().get("acc").getLocalValue());
+        assertThat(execution2.getIOMetrics()).isEqualTo(ioMetrics2);
+        assertThat(execution2.getUserAccumulators()).isNotNull();
+        
assertThat(execution2.getUserAccumulators().get("acc").getLocalValue()).isEqualTo(8);
     }
 
     /**
@@ -395,7 +370,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
      * accumulators and metrics correctly.
      */
     @Test
-    public void testAccumulatorsAndMetricsStorage() throws Exception {
+    void testAccumulatorsAndMetricsStorage() throws Exception {
         final JobVertexID jid1 = new JobVertexID();
         final JobVertexID jid2 = new JobVertexID();
 
@@ -413,41 +388,35 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
         execution1.cancel();
         execution1.completeCancelling(accumulators, ioMetrics, false);
 
-        assertEquals(ioMetrics, execution1.getIOMetrics());
-        assertEquals(accumulators, execution1.getUserAccumulators());
+        assertThat(execution1.getIOMetrics()).isEqualTo(ioMetrics);
+        assertThat(execution1.getUserAccumulators()).isEqualTo(accumulators);
 
         Execution execution2 = executions.values().iterator().next();
         execution2.markFailed(new Throwable(), false, accumulators, ioMetrics, 
false, true);
 
-        assertEquals(ioMetrics, execution2.getIOMetrics());
-        assertEquals(accumulators, execution2.getUserAccumulators());
+        assertThat(execution2.getIOMetrics()).isEqualTo(ioMetrics);
+        assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
     }
 
     @Test
-    public void testRegistrationOfExecutionsCanceled() {
-        try {
-
-            final JobVertexID jid1 = new JobVertexID();
-            final JobVertexID jid2 = new JobVertexID();
+    void testRegistrationOfExecutionsCanceled() throws Exception {
 
-            JobVertex v1 = new JobVertex("v1", jid1);
-            JobVertex v2 = new JobVertex("v2", jid2);
+        final JobVertexID jid1 = new JobVertexID();
+        final JobVertexID jid2 = new JobVertexID();
 
-            SchedulerBase scheduler = setupScheduler(v1, 19, v2, 37);
-            Collection<Execution> executions =
-                    new ArrayList<>(
-                            
scheduler.getExecutionGraph().getRegisteredExecutions().values());
+        JobVertex v1 = new JobVertex("v1", jid1);
+        JobVertex v2 = new JobVertex("v2", jid2);
 
-            for (Execution e : executions) {
-                e.cancel();
-                e.completeCancelling();
-            }
+        SchedulerBase scheduler = setupScheduler(v1, 19, v2, 37);
+        Collection<Execution> executions =
+                new 
ArrayList<>(scheduler.getExecutionGraph().getRegisteredExecutions().values());
 
-            assertEquals(0, 
scheduler.getExecutionGraph().getRegisteredExecutions().size());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        for (Execution e : executions) {
+            e.cancel();
+            e.completeCancelling();
         }
+
+        
assertThat(scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
     }
 
     /**
@@ -456,7 +425,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
      * swallow the fail exception when scheduling a consumer task.
      */
     @Test
-    public void testNoResourceAvailableFailure() throws Exception {
+    void testNoResourceAvailableFailure() throws Exception {
         JobVertex v1 = new JobVertex("source");
         JobVertex v2 = new JobVertex("sink");
 
@@ -481,7 +450,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
                 new DefaultSchedulerBuilder(
                                 graph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                EXECUTOR_RESOURCE.getExecutor())
+                                EXECUTOR_EXTENSION.getExecutor())
                         .setExecutionSlotAllocatorFactory(
                                 
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
                                         TestingPhysicalSlotProvider
@@ -507,7 +476,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
         scheduler.updateTaskExecutionState(
                 new TaskExecutionState(attemptID, ExecutionState.FINISHED, 
null));
 
-        assertEquals(JobStatus.FAILED, eg.getState());
+        assertThat(eg.getState()).isEqualTo(JobStatus.FAILED);
     }
 
     // ------------------------------------------------------------------------
@@ -515,16 +484,16 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws 
Exception {
+    void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
         final Configuration jobManagerConfig = new Configuration();
 
         final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
 
-        assertEquals(
-                
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
-                eg.getCheckpointCoordinator()
-                        .getCheckpointStore()
-                        .getMaxNumberOfRetainedCheckpoints());
+        assertThat(
+                        eg.getCheckpointCoordinator()
+                                .getCheckpointStore()
+                                .getMaxNumberOfRetainedCheckpoints())
+                
.isEqualTo(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue());
     }
 
     private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, 
int dop2)
@@ -542,7 +511,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
                 new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(v1, v2),
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                EXECUTOR_RESOURCE.getExecutor())
+                                EXECUTOR_EXTENSION.getExecutor())
                         .setExecutionSlotAllocatorFactory(
                                 
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory())
                         .setFutureExecutor(executorService)
@@ -556,13 +525,13 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
         scheduler.startScheduling();
 
         Map<ExecutionAttemptID, Execution> executions = 
eg.getRegisteredExecutions();
-        assertEquals(dop1 + dop2, executions.size());
+        assertThat(executions.size()).isEqualTo(dop1 + dop2);
 
         return scheduler;
     }
 
     @Test
-    public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws 
Exception {
+    void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
 
         final int negativeMaxNumberOfCheckpointsToRetain = -10;
 
@@ -573,22 +542,22 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
 
         final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
 
-        assertNotEquals(
-                negativeMaxNumberOfCheckpointsToRetain,
-                eg.getCheckpointCoordinator()
-                        .getCheckpointStore()
-                        .getMaxNumberOfRetainedCheckpoints());
-
-        assertEquals(
-                
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
-                eg.getCheckpointCoordinator()
-                        .getCheckpointStore()
-                        .getMaxNumberOfRetainedCheckpoints());
+        assertThat(
+                        eg.getCheckpointCoordinator()
+                                .getCheckpointStore()
+                                .getMaxNumberOfRetainedCheckpoints())
+                .isNotEqualTo(negativeMaxNumberOfCheckpointsToRetain);
+
+        assertThat(
+                        eg.getCheckpointCoordinator()
+                                .getCheckpointStore()
+                                .getMaxNumberOfRetainedCheckpoints())
+                
.isEqualTo(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue());
     }
 
     /** Tests that the {@link ExecutionGraph} is deployed in topological 
order. */
     @Test
-    public void testExecutionGraphIsDeployedInTopologicalOrder() throws 
Exception {
+    void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
         final int sourceParallelism = 2;
         final int sinkParallelism = 1;
 
@@ -628,7 +597,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
                 new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                EXECUTOR_RESOURCE.getExecutor())
+                                EXECUTOR_EXTENSION.getExecutor())
                         .setExecutionSlotAllocatorFactory(
                                 
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
                                         physicalSlotProvider))
@@ -670,7 +639,9 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
         }
 
         assertThat(
-                submittedTasks, new 
ExecutionStageMatcher(Arrays.asList(firstStage, secondStage)));
+                        isDeployedInTopologicalOrder(
+                                submittedTasks, Arrays.asList(firstStage, 
secondStage)))
+                .isTrue();
     }
 
     private ExecutionGraph createExecutionGraph(Configuration configuration) 
throws Exception {
@@ -693,41 +664,28 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
                 .setJobGraph(jobGraph)
                 .setJobMasterConfig(configuration)
                 .setBlobWriter(blobWriter)
-                .build(EXECUTOR_RESOURCE.getExecutor());
+                .build(EXECUTOR_EXTENSION.getExecutor());
     }
 
-    private static final class ExecutionStageMatcher
-            extends TypeSafeMatcher<List<ExecutionAttemptID>> {
-        private final List<Collection<ExecutionAttemptID>> executionStages;
-
-        private ExecutionStageMatcher(List<Collection<ExecutionAttemptID>> 
executionStages) {
-            this.executionStages = executionStages;
-        }
-
-        @Override
-        protected boolean matchesSafely(List<ExecutionAttemptID> 
submissionOrder) {
-            final Iterator<ExecutionAttemptID> submissionIterator = 
submissionOrder.iterator();
+    private boolean isDeployedInTopologicalOrder(
+            List<ExecutionAttemptID> submissionOrder,
+            List<Collection<ExecutionAttemptID>> executionStages) {
+        final Iterator<ExecutionAttemptID> submissionIterator = 
submissionOrder.iterator();
 
-            for (Collection<ExecutionAttemptID> stage : executionStages) {
-                final Collection<ExecutionAttemptID> currentStage = new 
ArrayList<>(stage);
+        for (Collection<ExecutionAttemptID> stage : executionStages) {
+            final Collection<ExecutionAttemptID> currentStage = new 
ArrayList<>(stage);
 
-                while (!currentStage.isEmpty() && 
submissionIterator.hasNext()) {
-                    if (!currentStage.remove(submissionIterator.next())) {
-                        return false;
-                    }
-                }
-
-                if (!currentStage.isEmpty()) {
+            while (!currentStage.isEmpty() && submissionIterator.hasNext()) {
+                if (!currentStage.remove(submissionIterator.next())) {
                     return false;
                 }
             }
 
-            return !submissionIterator.hasNext();
+            if (!currentStage.isEmpty()) {
+                return false;
+            }
         }
 
-        @Override
-        public void describeTo(Description description) {
-            description.appendValueList("<[", ", ", "]>", executionStages);
-        }
+        return !submissionIterator.hasNext();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java
index 2d2509cf028..8ff9b0af5ca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobCacheTest.java
@@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -34,26 +35,31 @@ import java.net.InetSocketAddress;
  * Tests {@link ExecutionGraph} deployment when offloading job and task 
information into the BLOB
  * server.
  */
-public class DefaultExecutionGraphDeploymentWithBlobCacheTest
+class DefaultExecutionGraphDeploymentWithBlobCacheTest
         extends DefaultExecutionGraphDeploymentWithBlobServerTest {
 
-    @Before
+    @BeforeEach
     @Override
     public void setupBlobServer() throws IOException {
         Configuration config = new Configuration();
         // always offload the serialized job and task information
         config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
-        blobServer = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new 
VoidBlobStore());
+        blobServer =
+                new BlobServer(
+                        config, TempDirUtils.newFolder(temporaryFolder), new 
VoidBlobStore());
         blobServer.start();
         blobWriter = blobServer;
 
         InetSocketAddress serverAddress = new InetSocketAddress("localhost", 
blobServer.getPort());
         blobCache =
                 new PermanentBlobCache(
-                        config, TEMPORARY_FOLDER.newFolder(), new 
VoidBlobStore(), serverAddress);
+                        config,
+                        TempDirUtils.newFolder(temporaryFolder),
+                        new VoidBlobStore(),
+                        serverAddress);
     }
 
-    @After
+    @AfterEach
     @Override
     public void shutdownBlobServer() throws IOException {
         if (blobServer != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java
index a5ed097272b..41391484c04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithBlobServerTest.java
@@ -22,71 +22,57 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.SerializedValue;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests {@link ExecutionGraph} deployment when offloading job and task 
information into the BLOB
  * server.
  */
-public class DefaultExecutionGraphDeploymentWithBlobServerTest
+class DefaultExecutionGraphDeploymentWithBlobServerTest
         extends DefaultExecutionGraphDeploymentTest {
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+    @TempDir Path temporaryFolder;
 
     private Set<byte[]> seenHashes =
             Collections.newSetFromMap(new ConcurrentHashMap<byte[], 
Boolean>());
 
     protected BlobServer blobServer = null;
 
-    @Before
+    @BeforeEach
     public void setupBlobServer() throws IOException {
         Configuration config = new Configuration();
         // always offload the serialized job and task information
         config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
         blobServer =
-                Mockito.spy(
-                        new BlobServer(config, TEMPORARY_FOLDER.newFolder(), 
new VoidBlobStore()));
+                new AssertBlobServer(
+                        config, TempDirUtils.newFolder(temporaryFolder), new 
VoidBlobStore());
         blobWriter = blobServer;
         blobCache = blobServer;
 
         seenHashes.clear();
-
-        // verify that we do not upload the same content more than once
-        doAnswer(
-                        invocation -> {
-                            PermanentBlobKey key = (PermanentBlobKey) 
invocation.callRealMethod();
-
-                            assertTrue(seenHashes.add(key.getHash()));
-
-                            return key;
-                        })
-                .when(blobServer)
-                .putPermanent(any(JobID.class), Matchers.<byte[]>any());
-
         blobServer.start();
     }
 
-    @After
+    @AfterEach
     public void shutdownBlobServer() throws IOException {
         if (blobServer != null) {
             blobServer.close();
@@ -98,7 +84,7 @@ public class DefaultExecutionGraphDeploymentWithBlobServerTest
         Either<SerializedValue<JobInformation>, PermanentBlobKey> 
jobInformationOrBlobKey =
                 eg.getJobInformationOrBlobKey();
 
-        assertTrue(jobInformationOrBlobKey.isRight());
+        assertThat(jobInformationOrBlobKey.isRight()).isTrue();
 
         // must not throw:
         blobServer.getFile(eg.getJobID(), jobInformationOrBlobKey.right());
@@ -109,9 +95,24 @@ public class 
DefaultExecutionGraphDeploymentWithBlobServerTest
         Either<SerializedValue<TaskInformation>, PermanentBlobKey> 
taskInformationOrBlobKey =
                 eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey();
 
-        assertTrue(taskInformationOrBlobKey.isRight());
+        assertThat(taskInformationOrBlobKey.isRight()).isTrue();
 
         // must not throw:
         blobServer.getFile(eg.getJobID(), taskInformationOrBlobKey.right());
     }
+
+    private class AssertBlobServer extends BlobServer {
+        public AssertBlobServer(Configuration config, File storageDir, 
BlobStore blobStore)
+                throws IOException {
+            super(config, storageDir, blobStore);
+        }
+
+        @Override
+        public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws 
IOException {
+            PermanentBlobKey key = super.putPermanent(jobId, value);
+            // verify that we do not upload the same content more than once
+            assertThat(seenHashes.add(key.getHash())).isTrue();
+            return key;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
index 2ea30d6df44..4b6da6f72b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
@@ -43,10 +43,11 @@ import 
org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.function.FunctionUtils;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -55,8 +56,7 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests {@link ExecutionGraph} deployment when job and task information are 
offloaded into the BLOB
@@ -65,17 +65,19 @@ import static org.junit.Assert.assertNotNull;
  * even the size limit of {@link BlobCacheSizeTracker} in {@link 
PermanentBlobCache} is set to the
  * minimum value.
  */
-public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
+class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
         extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
 
-    @Before
+    @BeforeEach
     @Override
     public void setupBlobServer() throws IOException {
         Configuration config = new Configuration();
         // Always offload the serialized JobInformation, TaskInformation and 
cached
         // ShuffleDescriptors
         config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
-        blobServer = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new 
VoidBlobStore());
+        blobServer =
+                new BlobServer(
+                        config, TempDirUtils.newFolder(temporaryFolder), new 
VoidBlobStore());
         blobServer.start();
         blobWriter = blobServer;
 
@@ -85,7 +87,7 @@ public class 
DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
         blobCache =
                 new PermanentBlobCache(
                         config,
-                        TEMPORARY_FOLDER.newFolder(),
+                        TempDirUtils.newFolder(temporaryFolder),
                         new VoidBlobStore(),
                         serverAddress,
                         blobCacheSizeTracker);
@@ -103,7 +105,7 @@ public class 
DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
      * larger than 1 and the deletion won't happen so frequently.
      */
     @Test
-    public void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws 
Exception {
+    void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception 
{
 
         final int numberOfVertices = 4;
         final int parallelism = 10;
@@ -124,7 +126,7 @@ public class 
DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
         for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) {
             for (ExecutionVertex ev : ejv.getTaskVertices()) {
 
-                assertEquals(ExecutionState.CREATED, ev.getExecutionState());
+                
assertThat(ev.getExecutionState()).isEqualTo(ExecutionState.CREATED);
 
                 LogicalSlot slot =
                         new TestingLogicalSlotBuilder()
@@ -134,13 +136,13 @@ public class 
DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
                 execution.transitionState(ExecutionState.SCHEDULED);
                 
execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
                 ev.deployToSlot(slot);
-                assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState());
+                
assertThat(ev.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);
 
                 TaskDeploymentDescriptor tdd = tdds.take();
-                assertNotNull(tdd);
+                assertThat(tdd).isNotNull();
 
                 List<InputGateDeploymentDescriptor> igdds = 
tdd.getInputGates();
-                assertEquals(ev.getAllConsumedPartitionGroups().size(), 
igdds.size());
+                
assertThat(igdds).hasSize(ev.getAllConsumedPartitionGroups().size());
 
                 if (igdds.size() > 0) {
                     checkShuffleDescriptors(igdds.get(0), 
ev.getConsumedPartitionGroup(0));
@@ -188,9 +190,8 @@ public class 
DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
             InputGateDeploymentDescriptor igdd, ConsumedPartitionGroup 
consumedPartitionGroup) {
         int idx = 0;
         for (IntermediateResultPartitionID consumedPartitionId : 
consumedPartitionGroup) {
-            assertEquals(
-                    consumedPartitionId,
-                    
igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId());
+            
assertThat(igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId())
+                    .isEqualTo(consumedPartitionId);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
index f4650e75df1..1403b60dff2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
@@ -30,40 +30,38 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.ArrayDeque;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link SortMergeResultPartition}. */
-@RunWith(Parameterized.class)
-public class SortMergeResultPartitionTest extends TestLogger {
+@ExtendWith(ParameterizedTestExtension.class)
+public class SortMergeResultPartitionTest {
 
     private static final int bufferSize = 1024;
 
@@ -73,7 +71,7 @@ public class SortMergeResultPartitionTest extends TestLogger {
 
     private static final int numThreads = 4;
 
-    private final boolean useHashDataBuffer;
+    @Parameter public boolean useHashDataBuffer;
 
     private final TestBufferAvailabilityListener listener = new 
TestBufferAvailabilityListener();
 
@@ -85,38 +83,33 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
 
     private ExecutorService readIOExecutor;
 
-    @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder();
+    @TempDir private Path tmpFolder;
 
-    @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
-
-    @Before
-    public void setUp() {
+    @BeforeEach
+    void setUp() throws IOException {
         fileChannelManager =
-                new FileChannelManagerImpl(new String[] 
{tmpFolder.getRoot().getPath()}, "testing");
+                new FileChannelManagerImpl(
+                        new String[] 
{TempDirUtils.newFolder(tmpFolder).toString()}, "testing");
         globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
         readBufferPool = new BatchShuffleReadBufferPool(totalBytes, 
bufferSize);
         readIOExecutor = Executors.newFixedThreadPool(numThreads);
     }
 
-    @After
-    public void shutdown() throws Exception {
+    @AfterEach
+    void shutdown() throws Exception {
         fileChannelManager.close();
         globalPool.destroy();
         readBufferPool.destroy();
         readIOExecutor.shutdown();
     }
 
-    @Parameterized.Parameters(name = "UseHashDataBuffer = {0}")
-    public static Object[] parameters() {
-        return new Object[] {true, false};
-    }
-
-    public SortMergeResultPartitionTest(boolean useHashDataBuffer) {
-        this.useHashDataBuffer = useHashDataBuffer;
+    @Parameters(name = "useHashDataBuffer={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
     }
 
-    @Test
-    public void testWriteAndRead() throws Exception {
+    @TestTemplate
+    void testWriteAndRead() throws Exception {
         int numBuffers = useHashDataBuffer ? 100 : 15;
         int numSubpartitions = 10;
         int numRecords = 1000;
@@ -232,8 +225,8 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
 
                     if (!buffer.isBuffer()) {
                         ++numEndOfPartitionEvents;
-                        assertFalse(
-                                
view.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
+                        
assertThat(view.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable())
+                                .isFalse();
                         view.releaseAllResources();
                     }
                     bufferAndBacklog = view.getNextBuffer();
@@ -252,16 +245,16 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
         return views;
     }
 
-    @Test
-    public void testWriteLargeRecord() throws Exception {
+    @TestTemplate
+    void testWriteLargeRecord() throws Exception {
         int numBuffers = useHashDataBuffer ? 100 : 15;
         BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
         SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
 
         ByteBuffer recordWritten = generateRandomData(bufferSize * numBuffers, 
new Random());
         partition.emitRecord(recordWritten, 0);
-        assertEquals(
-                useHashDataBuffer ? numBuffers : 0, 
bufferPool.bestEffortGetNumOfUsedBuffers());
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers())
+                .isEqualTo(useHashDataBuffer ? numBuffers : 0);
 
         partition.finish();
         partition.close();
@@ -286,11 +279,11 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
                 });
         recordWritten.rewind();
         recordRead.flip();
-        assertEquals(recordWritten, recordRead);
+        assertThat(recordRead).isEqualTo(recordWritten);
     }
 
-    @Test
-    public void testDataBroadcast() throws Exception {
+    @TestTemplate
+    void testDataBroadcast() throws Exception {
         int numSubpartitions = 10;
         int numBuffers = useHashDataBuffer ? 100 : 15;
         int numRecords = 10000;
@@ -308,11 +301,11 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
 
         int eventSize = 
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE).remaining();
         long dataSize = numSubpartitions * numRecords * bufferSize + 
numSubpartitions * eventSize;
-        assertNotNull(partition.getResultFile());
-        assertEquals(2, 
checkNotNull(fileChannelManager.getPaths()[0].list()).length);
+        assertThat(partition.getResultFile()).isNotNull();
+        
assertThat(checkNotNull(fileChannelManager.getPaths()[0].list()).length).isEqualTo(2);
         for (File file : 
checkNotNull(fileChannelManager.getPaths()[0].listFiles())) {
             if (file.getName().endsWith(PartitionedFile.DATA_FILE_SUFFIX)) {
-                assertTrue(file.length() < numSubpartitions * numRecords * 
bufferSize);
+                assertThat(file.length()).isLessThan(numSubpartitions * 
numRecords * bufferSize);
             }
         }
 
@@ -323,49 +316,47 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
                         bufferWithChannel -> {
                             bufferWithChannel.getBuffer().recycleBuffer();
                         });
-        assertEquals(dataSize, dataRead);
+        assertThat(dataRead).isEqualTo(dataSize);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testReleaseWhileWriting() throws Exception {
+    @TestTemplate
+    void testReleaseWhileWriting() throws Exception {
         int numBuffers = useHashDataBuffer ? 100 : 15;
 
         BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
         SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
-        assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers());
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
 
         partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 
1)), 0);
         partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 
1)), 1);
 
         partition.emitRecord(ByteBuffer.allocate(bufferSize), 2);
-        assertNull(partition.getResultFile());
-        assertEquals(2, fileChannelManager.getPaths()[0].list().length);
+        assertThat(partition.getResultFile()).isNull();
+        
assertThat(fileChannelManager.getPaths()[0].list().length).isEqualTo(2);
 
         partition.release();
-        try {
-            partition.emitRecord(ByteBuffer.allocate(bufferSize * numBuffers), 
2);
-        } catch (IllegalStateException exception) {
-            assertEquals(0, fileChannelManager.getPaths()[0].list().length);
 
-            throw exception;
-        }
+        assertThatThrownBy(
+                        () -> 
partition.emitRecord(ByteBuffer.allocate(bufferSize * numBuffers), 2))
+                .isInstanceOf(IllegalStateException.class);
+        
assertThat(fileChannelManager.getPaths()[0].list().length).isEqualTo(0);
     }
 
-    @Test
-    public void testRelease() throws Exception {
+    @TestTemplate
+    void testRelease() throws Exception {
         int numBuffers = useHashDataBuffer ? 100 : 15;
 
         BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
         SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
-        assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers());
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
 
         partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 
1)), 0);
         partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 
1)), 1);
         partition.finish();
         partition.close();
 
-        assertEquals(3, partition.getResultFile().getNumRegions());
-        assertEquals(2, 
checkNotNull(fileChannelManager.getPaths()[0].list()).length);
+        assertThat(partition.getResultFile().getNumRegions()).isEqualTo(3);
+        
assertThat(checkNotNull(fileChannelManager.getPaths()[0].list()).length).isEqualTo(2);
 
         ResultSubpartitionView view = partition.createSubpartitionView(0, 
listener);
         partition.release();
@@ -381,57 +372,54 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
         while (partition.getResultFile() != null) {
             Thread.sleep(100);
         }
-        assertEquals(0, 
checkNotNull(fileChannelManager.getPaths()[0].list()).length);
+        
assertThat(checkNotNull(fileChannelManager.getPaths()[0].list()).length).isEqualTo(0);
     }
 
-    @Test
-    public void testCloseReleasesAllBuffers() throws Exception {
+    @TestTemplate
+    void testCloseReleasesAllBuffers() throws Exception {
         int numBuffers = useHashDataBuffer ? 100 : 15;
 
         BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
         SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
-        assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers());
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
 
         partition.emitRecord(ByteBuffer.allocate(bufferSize * (numBuffers - 
1)), 5);
-        assertEquals(
-                useHashDataBuffer ? numBuffers : 0, 
bufferPool.bestEffortGetNumOfUsedBuffers());
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers())
+                .isEqualTo(useHashDataBuffer ? numBuffers : 0);
 
         partition.close();
-        assertTrue(bufferPool.isDestroyed());
-        assertEquals(totalBuffers, 
globalPool.getNumberOfAvailableMemorySegments());
+        assertThat(bufferPool.isDestroyed()).isTrue();
+        
assertThat(globalPool.getNumberOfAvailableMemorySegments()).isEqualTo(totalBuffers);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testReadUnfinishedPartition() throws Exception {
+    @TestTemplate
+    void testReadUnfinishedPartition() throws Exception {
         BufferPool bufferPool = globalPool.createBufferPool(10, 10);
-        try {
-            SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
-            partition.createSubpartitionView(0, listener);
-        } finally {
-            bufferPool.lazyDestroy();
-        }
+        SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
+        assertThatThrownBy(() -> partition.createSubpartitionView(0, listener))
+                .isInstanceOf(IllegalStateException.class);
+        bufferPool.lazyDestroy();
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testReadReleasedPartition() throws Exception {
+    @TestTemplate
+    void testReadReleasedPartition() throws Exception {
         BufferPool bufferPool = globalPool.createBufferPool(10, 10);
-        try {
-            SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
-            partition.finish();
-            partition.release();
-            partition.createSubpartitionView(0, listener);
-        } finally {
-            bufferPool.lazyDestroy();
-        }
+        SortMergeResultPartition partition = createSortMergedPartition(10, 
bufferPool);
+        partition.finish();
+        partition.release();
+
+        assertThatThrownBy(() -> partition.createSubpartitionView(0, listener))
+                .isInstanceOf(IllegalStateException.class);
+        bufferPool.lazyDestroy();
     }
 
-    @Test
-    public void testNumBytesProducedCounterForUnicast() throws IOException {
+    @TestTemplate
+    void testNumBytesProducedCounterForUnicast() throws IOException {
         testNumBytesProducedCounter(false);
     }
 
-    @Test
-    public void testNumBytesProducedCounterForBroadcast() throws IOException {
+    @TestTemplate
+    void testNumBytesProducedCounterForBroadcast() throws IOException {
         testNumBytesProducedCounter(true);
     }
 
@@ -447,14 +435,16 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
             partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
             partition.finish();
 
-            assertEquals(bufferSize + 4, 
partition.numBytesProduced.getCount());
-            assertEquals(numSubpartitions * (bufferSize + 4), 
partition.numBytesOut.getCount());
+            
assertThat(partition.numBytesProduced.getCount()).isEqualTo(bufferSize + 4);
+            assertThat(partition.numBytesOut.getCount())
+                    .isEqualTo(numSubpartitions * (bufferSize + 4));
         } else {
             partition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
             partition.finish();
 
-            assertEquals(bufferSize + 4, 
partition.numBytesProduced.getCount());
-            assertEquals(bufferSize + numSubpartitions * 4, 
partition.numBytesOut.getCount());
+            
assertThat(partition.numBytesProduced.getCount()).isEqualTo(bufferSize + 4);
+            assertThat(partition.numBytesOut.getCount())
+                    .isEqualTo(bufferSize + numSubpartitions * 4);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 71a978636bd..0faab8c1815 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -23,27 +23,24 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Map;
 
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link TaskIOMetricGroup}. */
-public class TaskIOMetricGroupTest {
+class TaskIOMetricGroupTest {
     @Test
-    public void testTaskIOMetricGroup() throws InterruptedException {
+    void testTaskIOMetricGroup() throws InterruptedException {
         TaskMetricGroup task = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
         TaskIOMetricGroup taskIO = task.getIOMetricGroup();
         taskIO.setEnableBusyTime(true);
         final long startTime = System.currentTimeMillis();
 
         // test counter forwarding
-        assertNotNull(taskIO.getNumRecordsInCounter());
-        assertNotNull(taskIO.getNumRecordsOutCounter());
+        assertThat(taskIO.getNumRecordsInCounter()).isNotNull();
+        assertThat(taskIO.getNumRecordsOutCounter()).isNotNull();
 
         Counter c1 = new SimpleCounter();
         c1.inc(32L);
@@ -52,8 +49,9 @@ public class TaskIOMetricGroupTest {
 
         taskIO.reuseRecordsInputCounter(c1);
         taskIO.reuseRecordsOutputCounter(c2);
-        assertEquals(32L, taskIO.getNumRecordsInCounter().getCount());
-        assertEquals(64L, taskIO.getNumRecordsOutCounter().getCount());
+
+        assertThat(taskIO.getNumRecordsInCounter().getCount()).isEqualTo(32L);
+        assertThat(taskIO.getNumRecordsOutCounter().getCount()).isEqualTo(64L);
 
         // test IOMetrics instantiation
         taskIO.getNumBytesInCounter().inc(100L);
@@ -72,35 +70,33 @@ public class TaskIOMetricGroupTest {
         taskIO.getHardBackPressuredTimePerSecond().markEnd();
 
         IOMetrics io = taskIO.createSnapshot();
-        assertEquals(32L, io.getNumRecordsIn());
-        assertEquals(64L, io.getNumRecordsOut());
-        assertEquals(100L, io.getNumBytesIn());
-        assertEquals(250L, io.getNumBytesOut());
-        assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
-        assertEquals(
-                taskIO.getIdleTimeMsPerSecond().getAccumulatedCount(), 
io.getAccumulateIdleTime());
-        assertEquals(
-                
taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount()
-                        + 
taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount(),
-                io.getAccumulateBackPressuredTime());
+        assertThat(io.getNumRecordsIn()).isEqualTo(32L);
+        assertThat(io.getNumRecordsOut()).isEqualTo(64L);
+        assertThat(io.getNumBytesIn()).isEqualTo(100L);
+        assertThat(io.getNumBytesOut()).isEqualTo(250L);
+        assertThat(taskIO.getNumBuffersOutCounter().getCount()).isEqualTo(3L);
+        assertThat(taskIO.getIdleTimeMsPerSecond().getAccumulatedCount())
+                .isEqualTo(io.getAccumulateIdleTime());
         assertThat(
-                io.getAccumulateBusyTime(),
-                greaterThanOrEqualTo(
+                        
taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount()
+                                + 
taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount())
+                .isEqualTo(io.getAccumulateBackPressuredTime());
+        assertThat(io.getAccumulateBusyTime())
+                .isGreaterThanOrEqualTo(
                         (double) System.currentTimeMillis()
                                 - startTime
                                 - io.getAccumulateIdleTime()
-                                - io.getAccumulateBackPressuredTime()));
-        assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), 
greaterThanOrEqualTo(softSleepTime));
-        assertThat(
-                taskIO.getSoftBackPressuredTimePerSecond().getCount(),
-                greaterThanOrEqualTo(softSleepTime));
-        assertThat(
-                taskIO.getHardBackPressuredTimePerSecond().getCount(),
-                greaterThanOrEqualTo(hardSleepTime));
+                                - io.getAccumulateBackPressuredTime());
+        assertThat(taskIO.getIdleTimeMsPerSecond().getCount())
+                .isGreaterThanOrEqualTo(softSleepTime);
+        assertThat(taskIO.getSoftBackPressuredTimePerSecond().getCount())
+                .isGreaterThanOrEqualTo(softSleepTime);
+        assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount())
+                .isGreaterThanOrEqualTo(hardSleepTime);
     }
 
     @Test
-    public void testNumBytesProducedOfPartitionsMetrics() {
+    void testNumBytesProducedOfPartitionsMetrics() {
         TaskMetricGroup task = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
         TaskIOMetricGroup taskIO = task.getIOMetricGroup();
 
@@ -118,8 +114,8 @@ public class TaskIOMetricGroupTest {
         Map<IntermediateResultPartitionID, Long> numBytesProducedOfPartitions =
                 taskIO.createSnapshot().getNumBytesProducedOfPartitions();
 
-        assertEquals(2, numBytesProducedOfPartitions.size());
-        assertEquals(32L, 
numBytesProducedOfPartitions.get(resultPartitionID1).longValue());
-        assertEquals(64L, 
numBytesProducedOfPartitions.get(resultPartitionID2).longValue());
+        assertThat(numBytesProducedOfPartitions.size()).isEqualTo(2);
+        
assertThat(numBytesProducedOfPartitions.get(resultPartitionID1).longValue()).isEqualTo(32L);
+        
assertThat(numBytesProducedOfPartitions.get(resultPartitionID2).longValue()).isEqualTo(64L);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java
index 2591a9fead0..387e2bdc051 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java
@@ -22,17 +22,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link DefaultVertexParallelismDecider}. */
-public class DefaultVertexParallelismDeciderTest {
+class DefaultVertexParallelismDeciderTest {
 
     private static final long BYTE_256_MB = 256 * 1024 * 1024L;
     private static final long BYTE_512_MB = 512 * 1024 * 1024L;
@@ -47,8 +46,8 @@ public class DefaultVertexParallelismDeciderTest {
 
     private DefaultVertexParallelismDecider decider;
 
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    void before() throws Exception {
         Configuration configuration = new Configuration();
 
         configuration.setInteger(
@@ -66,19 +65,19 @@ public class DefaultVertexParallelismDeciderTest {
     }
 
     @Test
-    public void testNormalizedMaxAndMinParallelism() {
-        assertThat(decider.getMaxParallelism(), is(64));
-        assertThat(decider.getMinParallelism(), is(4));
+    void testNormalizedMaxAndMinParallelism() {
+        assertThat(decider.getMaxParallelism()).isEqualTo(64);
+        assertThat(decider.getMinParallelism()).isEqualTo(4);
     }
 
     @Test
-    public void testSourceJobVertex() {
+    void testSourceJobVertex() {
         int parallelism = 
decider.decideParallelismForVertex(Collections.emptyList());
-        assertThat(parallelism, is(DEFAULT_SOURCE_PARALLELISM));
+        assertThat(parallelism).isEqualTo(DEFAULT_SOURCE_PARALLELISM);
     }
 
     @Test
-    public void testNormalizeParallelismDownToPowerOf2() {
+    void testNormalizeParallelismDownToPowerOf2() {
         BlockingResultInfo resultInfo1 =
                 
BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB));
         BlockingResultInfo resultInfo2 =
@@ -88,11 +87,11 @@ public class DefaultVertexParallelismDeciderTest {
         int parallelism =
                 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-        assertThat(parallelism, is(8));
+        assertThat(parallelism).isEqualTo(8);
     }
 
     @Test
-    public void testNormalizeParallelismUpToPowerOf2() {
+    void testNormalizeParallelismUpToPowerOf2() {
         BlockingResultInfo resultInfo1 =
                 
BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB));
         BlockingResultInfo resultInfo2 =
@@ -102,11 +101,11 @@ public class DefaultVertexParallelismDeciderTest {
         int parallelism =
                 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-        assertThat(parallelism, is(16));
+        assertThat(parallelism).isEqualTo(16);
     }
 
     @Test
-    public void testInitiallyNormalizedParallelismIsLargerThanMaxParallelism() 
{
+    void testInitiallyNormalizedParallelismIsLargerThanMaxParallelism() {
         BlockingResultInfo resultInfo1 =
                 
BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB));
         BlockingResultInfo resultInfo2 =
@@ -116,11 +115,11 @@ public class DefaultVertexParallelismDeciderTest {
         int parallelism =
                 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-        assertThat(parallelism, is(64));
+        assertThat(parallelism).isEqualTo(64);
     }
 
     @Test
-    public void 
testInitiallyNormalizedParallelismIsSmallerThanMinParallelism() {
+    void testInitiallyNormalizedParallelismIsSmallerThanMinParallelism() {
         BlockingResultInfo resultInfo1 =
                 
BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_256_MB));
         BlockingResultInfo resultInfo2 =
@@ -129,11 +128,11 @@ public class DefaultVertexParallelismDeciderTest {
         int parallelism =
                 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-        assertThat(parallelism, is(4));
+        assertThat(parallelism).isEqualTo(4);
     }
 
     @Test
-    public void testBroadcastRatioExceedsCapRatio() {
+    void testBroadcastRatioExceedsCapRatio() {
         BlockingResultInfo resultInfo1 =
                 
BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_1_GB));
         BlockingResultInfo resultInfo2 =
@@ -142,11 +141,11 @@ public class DefaultVertexParallelismDeciderTest {
         int parallelism =
                 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-        assertThat(parallelism, is(16));
+        assertThat(parallelism).isEqualTo(16);
     }
 
     @Test
-    public void testNonBroadcastBytesCanNotDividedEvenly() {
+    void testNonBroadcastBytesCanNotDividedEvenly() {
         BlockingResultInfo resultInfo1 =
                 
BlockingResultInfo.createFromBroadcastResult(Arrays.asList(BYTE_512_MB));
         BlockingResultInfo resultInfo2 =
@@ -156,6 +155,6 @@ public class DefaultVertexParallelismDeciderTest {
         int parallelism =
                 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-        assertThat(parallelism, is(16));
+        assertThat(parallelism).isEqualTo(16);
     }
 }

Reply via email to