1996fanrui commented on code in PR #23204:
URL: https://github.com/apache/flink/pull/23204#discussion_r1295487947


##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java:
##########
@@ -233,18 +237,16 @@ public void 
canReconnectToOldLeaderWithSameLeaderAddress() throws Exception {
             leaderRetrievalService.notifyListener(
                     jobMasterGateway.getAddress(), 
jobMasterGateway.getFencingToken().toUUID());
 
-            try {
-                newLeaderFuture.get(10, TimeUnit.MILLISECONDS);
-                fail("The leader future should not be completed.");
-            } catch (TimeoutException expected) {
-            }
+            assertThatThrownBy(() -> newLeaderFuture.get(10, 
TimeUnit.MILLISECONDS))
+                    .withFailMessage("The leader future should not be 
completed.")
+                    .isInstanceOf(TimeoutException.class);

Review Comment:
   ```suggestion
               
FlinkAssertions.assertThatFuture(newLeaderFuture).willNotCompleteWithin(Duration.ofMillis(10));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java:
##########
@@ -124,7 +132,7 @@ private static CompletableFuture<JobResult> 
submitJobAndWaitUntilRunning(
         final CompletableFuture<JobResult> jobResultFuture =
                 miniCluster.requestJobResult(jobGraph.getJobID());
 
-        assertThat(jobResultFuture.isDone(), is(false));
+        assertThat(jobResultFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
           assertThat(jobResultFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java:
##########
@@ -192,7 +190,7 @@ public void testDeployedExecutionReporting() throws 
Exception {
 
             // nothing as deployed, so the deployment report should be empty
             taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
-            assertThat(deployedExecutionsQueue.take(), hasSize(0));
+            assertThat(deployedExecutionsQueue.take()).hasSize(0);

Review Comment:
   ```suggestion
               assertThat(deployedExecutionsQueue.take()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderServiceTest.java:
##########
@@ -278,7 +280,7 @@ public void 
rejectedJobManagerRegistrationCallsJobLeaderListener() throws Except
             leaderRetrievalService.notifyListener(
                     jobMasterGateway.getAddress(), 
jobMasterGateway.getFencingToken().toUUID());
 
-            assertThat(rejectedRegistrationFuture.get(), is(jobId));
+            assertThat(rejectedRegistrationFuture.get()).isEqualTo(jobId);

Review Comment:
   ```suggestion
               
assertThat(rejectedRegistrationFuture).isCompletedWithValue(jobId);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTest.java:
##########
@@ -49,11 +47,11 @@ public void 
testTaskSlotClosedOnlyWhenAddedTasksTerminated() throws Exception {
             task.waitForFailure();
             MemoryManager memoryManager = taskSlot.getMemoryManager();
 
-            assertThat(closingFuture.isDone(), is(false));
-            assertThat(memoryManager.isShutdown(), is(false));
+            assertThat(closingFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
               assertThat(closingFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/DefaultTimerServiceTest.java:
##########
@@ -50,22 +47,22 @@ public void testUnregisterAllTimeouts() throws Exception {
         timerService.unregisterAllTimeouts();
 
         Map<?, ?> timeouts = timerService.getTimeouts();
-        assertTrue(timeouts.isEmpty());
+        assertThat(timeouts.isEmpty()).isTrue();

Review Comment:
   ```suggestion
           assertThat(timeouts).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -222,14 +207,15 @@ public void 
testWorkingDirIsSetupWhenStartingTaskManagerRunner() throws Exceptio
         final TaskManagerRunner taskManagerRunner = 
createTaskManagerRunner(configuration);
 
         try {
-            assertTrue(workingDir.exists());
+            assertThat(workingDir.exists()).isTrue();
         } finally {
             taskManagerRunner.close();
         }
 
-        assertFalse(
-                "The working dir should be cleaned up when stopping the 
TaskManager process gracefully.",
-                workingDir.exists());
+        assertThat(workingDir.exists())
+                .withFailMessage(
+                        "The working dir should be cleaned up when stopping 
the TaskManager process gracefully.")
+                .isFalse();

Review Comment:
   ```suggestion
           assertThat(workingDir)
                   .withFailMessage(
                           "The working dir should be cleaned up when stopping 
the TaskManager process gracefully.")
                   .doesNotExist();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -222,14 +207,15 @@ public void 
testWorkingDirIsSetupWhenStartingTaskManagerRunner() throws Exceptio
         final TaskManagerRunner taskManagerRunner = 
createTaskManagerRunner(configuration);
 
         try {
-            assertTrue(workingDir.exists());
+            assertThat(workingDir.exists()).isTrue();

Review Comment:
   ```suggestion
               assertThat(workingDir).exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TimeUtils;
 
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.net.InetAddress;
+import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)
+class TaskManagerRunnerTest {
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
-    @Rule public final Timeout timeout = Timeout.seconds(30);
+    @TempDir private Path temporaryFolder;
 
     private TaskManagerRunner taskManagerRunner;
 
-    @After
-    public void after() throws Exception {
+    @AfterEach
+    void after() throws Exception {
         if (taskManagerRunner != null) {
             taskManagerRunner.close();
         }
     }
 
     @Test
-    public void testShouldShutdownOnFatalError() throws Exception {
+    void testShouldShutdownOnFatalError() throws Exception {
         Configuration configuration = createConfiguration();
         // very high timeout, to ensure that we don't fail because of 
registration timeouts
         configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, 
TimeUtils.parseDuration("42 h"));
         taskManagerRunner = createTaskManagerRunner(configuration);
 
         taskManagerRunner.onFatalError(new RuntimeException());
 
-        assertThat(
-                taskManagerRunner.getTerminationFuture().join(),
-                is(equalTo(TaskManagerRunner.Result.FAILURE)));
+        assertThat(taskManagerRunner.getTerminationFuture().join())
+                .isEqualTo(TaskManagerRunner.Result.FAILURE);
     }
 
     @Test
-    public void testShouldShutdownIfRegistrationWithJobManagerFails() throws 
Exception {
+    void testShouldShutdownIfRegistrationWithJobManagerFails() throws 
Exception {
         Configuration configuration = createConfiguration();
         configuration.set(
                 TaskManagerOptions.REGISTRATION_TIMEOUT, 
TimeUtils.parseDuration("10 ms"));
         taskManagerRunner = createTaskManagerRunner(configuration);
 
-        assertThat(
-                taskManagerRunner.getTerminationFuture().join(),
-                is(equalTo(TaskManagerRunner.Result.FAILURE)));
+        assertThat(taskManagerRunner.getTerminationFuture().join())
+                .isEqualTo(TaskManagerRunner.Result.FAILURE);

Review Comment:
   ```suggestion
       
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.FAILURE);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TimeUtils;
 
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.net.InetAddress;
+import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)

Review Comment:
   Why add the `Timeout` here?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -220,7 +216,7 @@ private void 
testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(
                     taskExecutor.getSelfGateway(TaskExecutorGateway.class);
 
             trackerIsTrackingPartitions.set(true);
-            assertThat(firstReleasePartitionsCallFuture.isDone(), is(false));
+            assertThat(firstReleasePartitionsCallFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
               assertThat(firstReleasePartitionsCallFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2125,7 +2099,7 @@ public void testOfferSlotToJobMasterAfterTimeout() throws 
Exception {
                 new TestingJobMasterGatewayBuilder()
                         .setOfferSlotsFunction(
                                 (resourceID, slotOffers) -> {
-                                    assertThat(slotOffers.size(), is(1));
+                                    assertThat(slotOffers.size()).isOne();

Review Comment:
   ```suggestion
                                       assertThat(slotOffers).hasSize(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java:
##########
@@ -38,42 +38,50 @@
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Integration tests for the {@link TaskExecutor}. */
-public class TaskExecutorITCase extends TestLogger {
+class TaskExecutorITCase {
 
     private static final int NUM_TMS = 2;
     private static final int SLOTS_PER_TM = 2;
     private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
 
-    @Rule
-    public final MiniClusterResource miniClusterResource =
+    private final MiniClusterResource miniClusterResource =

Review Comment:
   Using the `MiniClusterExtension` instead, and the `@BeforeEach` and 
`@AfterEach` can be removed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java:
##########
@@ -60,31 +59,31 @@
 
 import static 
org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;

Review Comment:
   The `assertThat` is wrong package.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2018,21 +1993,20 @@ public void 
testRegisterWithDefaultSlotResourceProfile() throws Exception {
                     testingResourceManagerGateway.getAddress(),
                     testingResourceManagerGateway.getFencingToken().toUUID());
 
-            assertThat(
-                    registeredDefaultSlotResourceProfileFuture.get(),
-                    equalTo(
+            assertThat(registeredDefaultSlotResourceProfileFuture.get())
+                    .isEqualTo(

Review Comment:
   ```suggestion
               
assertThat(registeredDefaultSlotResourceProfileFuture).isCompletedWithValue(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java:
##########
@@ -203,16 +201,15 @@ public void testDeployedExecutionReporting() throws 
Exception {
 
             // task is deployed, so the deployment report should contain it
             taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
-            assertThat(
-                    deployedExecutionsQueue.take(),
-                    hasItem(taskDeploymentDescriptor.getExecutionAttemptId()));
+            assertThat(deployedExecutionsQueue.take())
+                    
.contains(taskDeploymentDescriptor.getExecutionAttemptId());
 
             TestingInvokable.sync.releaseBlocker();
 
             // task is finished ans was cleaned up, so the deployment report 
should be empty
             taskFinishedFuture.get();
             taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
-            assertThat(deployedExecutionsQueue.take(), hasSize(0));
+            assertThat(deployedExecutionsQueue.take()).hasSize(0);

Review Comment:
   ```suggestion
               assertThat(deployedExecutionsQueue.take()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -231,7 +227,7 @@ private void 
testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(
             firstReleasePartitionsCallFuture.get();
 
             // connection should be kept alive since the table still contains 
partitions
-            assertThat(disconnectFuture.isDone(), is(false));
+            assertThat(disconnectFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
               assertThat(disconnectFuture).isNotDone()();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -255,12 +251,12 @@ public void 
testPartitionReleaseAfterJobMasterDisconnect() throws Exception {
                 (jobId, resultPartitionDeploymentDescriptor, taskExecutor, 
taskExecutorGateway) -> {
                     taskExecutorGateway.disconnectJobManager(jobId, new 
Exception("test"));
 
-                    assertThat(releasePartitionsForJobFuture.get(), 
equalTo(jobId));
+                    
assertThat(releasePartitionsForJobFuture.get()).isEqualTo(jobId);

Review Comment:
   ```suggestion
                       
assertThat(releasePartitionsForJobFuture).isCompletedWithValue(jobId);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java:
##########
@@ -146,20 +148,20 @@ public void testSubmitTaskFailure() throws Exception {
 
         try (TaskSubmissionTestEnvironment env =
                 new TaskSubmissionTestEnvironment.Builder(jobId)
-                        .build(EXECUTOR_RESOURCE.getExecutor())) {
+                        .build(EXECUTOR_EXTENSION.getExecutor())) {
             TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
             TaskSlotTable taskSlotTable = env.getTaskSlotTable();
 
             taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), 
Time.seconds(60));
-            tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
-        } catch (Exception e) {
-            assertThat(e.getCause(), 
instanceOf(IllegalArgumentException.class));
+
+            assertThatThrownBy(() -> tmGateway.submitTask(tdd, 
env.getJobMasterId(), timeout).get())
+                    .hasCauseInstanceOf(IllegalArgumentException.class);

Review Comment:
   ```suggestion
               assertThatFuture(mGateway.submitTask(tdd, env.getJobMasterId(), 
timeout))
                   .eventuallyFailsWith(ExecutionException.class)
                   .withCauseInstanceOf(IllegalArgumentException.class);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2234,19 +2208,20 @@ public void testDisconnectFromJobMasterWhenNewLeader() 
throws Exception {
             jobManagerLeaderRetriever.notifyListener(
                     jobMasterGateway.getAddress(), UUID.randomUUID());
 
-            assertThat(offeredSlotsFuture.get(), is(1));
+            assertThat(offeredSlotsFuture.get()).isOne();
 
             // notify loss of leadership
             jobManagerLeaderRetriever.notifyListener(null, null);
 
-            assertThat(disconnectFuture.get(), is(resourceID));
+            assertThat(disconnectFuture.get()).isEqualTo(resourceID);

Review Comment:
   ```suggestion
               assertThat(disconnectFuture).isCompletedWithValue(resourceID);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java:
##########
@@ -544,12 +540,11 @@ private void internalTestPartitionRelease(
             TestingInvokable.sync.awaitBlocker();
 
             // the task is still running => the partition is in in-progress 
and should be tracked
-            assertThat(
-                    startTrackingFuture.get(),
-                    equalTo(
+            assertThat(startTrackingFuture.get())
+                    .isEqualTo(

Review Comment:
   ```suggestion
                       .isCompletedWithValue(
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -3086,13 +3058,13 @@ public void testSharedResourcesLifecycle() throws 
Exception {
                 ctx.taskExecutor.cancelTask(executions.get(i), timeout).get();
                 waitForTasks(ctx, numTasks -> numTasks > numRemaining);
                 if (numRemaining > 0) {
-                    assertEquals(0, 
SharedResourceCollectingInvokable.timesDeallocated.get());
+                    
assertThat(SharedResourceCollectingInvokable.timesDeallocated.get()).isZero();
                 }
             }
         }
         // verify
-        assertEquals(1, 
SharedResourceCollectingInvokable.timesAllocated.get());
-        assertEquals(1, 
SharedResourceCollectingInvokable.timesDeallocated.get());
+        
assertThat(SharedResourceCollectingInvokable.timesAllocated.get()).isOne();
+        
assertThat(SharedResourceCollectingInvokable.timesDeallocated.get()).isOne();

Review Comment:
   ```suggestion
           
assertThat(SharedResourceCollectingInvokable.timesAllocated).hasValue(1);
           
assertThat(SharedResourceCollectingInvokable.timesDeallocated).hasValue(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -204,14 +190,13 @@ public void 
testUnexpectedTaskManagerTerminationAfterRunnerCloseWillBeIgnored()
 
         terminationFuture.complete(null);
 
-        assertThat(
-                taskManagerRunner.getTerminationFuture().join(),
-                is(equalTo(TaskManagerRunner.Result.SUCCESS)));
+        assertThat(taskManagerRunner.getTerminationFuture().join())
+                .isEqualTo(TaskManagerRunner.Result.SUCCESS);

Review Comment:
   ```suggestion
           
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.SUCCESS);
   ```
   
   I guess these `CompletableFuture.join()` can be replaced by 
`isCompletedWithValue`, please correct my if i'm wrong, thanks!



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -258,10 +244,11 @@ public void testWorkingDirIsNotDeletedInCaseOfFailure() 
throws Exception {
 
         taskManagerRunner.getTerminationFuture().join();
 
-        assertTrue(
-                ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
-                                configuration, resourceId)
-                        .exists());
+        assertThat(
+                        
ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
+                                        configuration, resourceId)
+                                .exists())
+                .isTrue();

Review Comment:
   ```suggestion
           assertThat(
                           
ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(
                                           configuration, resourceId)).exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2257,34 +2232,31 @@ public void testLogNotFoundHandling() throws Throwable {
                 new Builder(jobId)
                         .setConfiguration(configuration)
                         .setLocalCommunication(false)
-                        .build(EXECUTOR_RESOURCE.getExecutor())) {
+                        .build(EXECUTOR_EXTENSION.getExecutor())) {
             TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-            try {
-                CompletableFuture<TransientBlobKey> logFuture =
-                        tmGateway.requestFileUploadByType(FileType.LOG, 
timeout);
-                logFuture.get();
-            } catch (Exception e) {
-                assertThat(
-                        e.getMessage(),
-                        containsString("The file LOG does not exist on the 
TaskExecutor."));
-            }
+            CompletableFuture<TransientBlobKey> logFuture =
+                    tmGateway.requestFileUploadByType(FileType.LOG, timeout);
+            assertThatThrownBy(logFuture::get)
+                    .hasMessageContaining("The file LOG does not exist on the 
TaskExecutor.")
+                    .isInstanceOf(ExecutionException.class);

Review Comment:
   ```suggestion
   FlinkAssertions
                   
.assertThatFuture(logFuture).eventuallyFails().withThrowableOfType(ExecutionException.class).withMessageContaining("The
 file LOG does not exist on the TaskExecutor.").
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -267,118 +258,110 @@ public void 
testSlotAllocationWithConcreteResourceProfile() throws Exception {
                             
ResourceProfile.newBuilder().setCpuCores(0.1).build());
 
             assertThat(
-                    taskSlotTable.allocateSlot(
-                            -1, jobId, allocationId, resourceProfile, 
SLOT_TIMEOUT),
-                    is(true));
+                            taskSlotTable.allocateSlot(
+                                    -1, jobId, allocationId, resourceProfile, 
SLOT_TIMEOUT))
+                    .isTrue();
 
             Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
                     taskSlotTable.getAllocatedSlots(jobId);
             TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
-            assertThat(allocatedSlot.getIndex(), is(2));
-            assertThat(allocatedSlot.getResourceProfile(), 
is(resourceProfile));
-            assertThat(allocatedSlots.hasNext(), is(false));
+            assertThat(allocatedSlot.getIndex()).isEqualTo(2);
+            
assertThat(allocatedSlot.getResourceProfile()).isEqualTo(resourceProfile);
+            assertThat(allocatedSlots.hasNext()).isFalse();
         }
     }
 
     @Test
-    public void testSlotAllocationWithUnknownResourceProfile() throws 
Exception {
+    void testSlotAllocationWithUnknownResourceProfile() throws Exception {
         try (final TaskSlotTableImpl<TaskSlotPayload> taskSlotTable =
                 createTaskSlotTableAndStart(2)) {
             final JobID jobId = new JobID();
             final AllocationID allocationId = new AllocationID();
 
             assertThat(
-                    taskSlotTable.allocateSlot(
-                            -1, jobId, allocationId, ResourceProfile.UNKNOWN, 
SLOT_TIMEOUT),
-                    is(true));
+                            taskSlotTable.allocateSlot(
+                                    -1, jobId, allocationId, 
ResourceProfile.UNKNOWN, SLOT_TIMEOUT))
+                    .isTrue();
 
             Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
                     taskSlotTable.getAllocatedSlots(jobId);
             TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
-            assertThat(allocatedSlot.getIndex(), is(2));
-            assertThat(
-                    allocatedSlot.getResourceProfile(), 
is(TaskSlotUtils.DEFAULT_RESOURCE_PROFILE));
-            assertThat(allocatedSlots.hasNext(), is(false));
+            assertThat(allocatedSlot.getIndex()).isEqualTo(2);
+            assertThat(allocatedSlot.getResourceProfile())
+                    .isEqualTo(TaskSlotUtils.DEFAULT_RESOURCE_PROFILE);
+            assertThat(allocatedSlots.hasNext()).isFalse();
         }
     }
 
     @Test
-    public void testSlotAllocationWithResourceProfileFailure() throws 
Exception {
+    void testSlotAllocationWithResourceProfileFailure() throws Exception {
         try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = 
createTaskSlotTableAndStart(2)) {
             final JobID jobId = new JobID();
             final AllocationID allocationId = new AllocationID();
             ResourceProfile resourceProfile = 
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE;
             resourceProfile = 
resourceProfile.merge(resourceProfile).merge(resourceProfile);
 
             assertThat(
-                    taskSlotTable.allocateSlot(
-                            -1, jobId, allocationId, resourceProfile, 
SLOT_TIMEOUT),
-                    is(false));
+                            taskSlotTable.allocateSlot(
+                                    -1, jobId, allocationId, resourceProfile, 
SLOT_TIMEOUT))
+                    .isFalse();
 
             Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
                     taskSlotTable.getAllocatedSlots(jobId);
-            assertThat(allocatedSlots.hasNext(), is(false));
+            assertThat(allocatedSlots.hasNext()).isFalse();
         }
     }
 
     @Test
-    public void testGenerateSlotReport() throws Exception {
+    void testGenerateSlotReport() throws Exception {
         try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = 
createTaskSlotTableAndStart(3)) {
             final JobID jobId = new JobID();
             final AllocationID allocationId1 = new AllocationID();
             final AllocationID allocationId2 = new AllocationID();
             final AllocationID allocationId3 = new AllocationID();
 
-            assertThat(
-                    taskSlotTable.allocateSlot(0, jobId, allocationId1, 
SLOT_TIMEOUT),
-                    is(true)); // index 0
-            assertThat(
-                    taskSlotTable.allocateSlot(-1, jobId, allocationId2, 
SLOT_TIMEOUT),
-                    is(true)); // index 3
-            assertThat(
-                    taskSlotTable.allocateSlot(-1, jobId, allocationId3, 
SLOT_TIMEOUT),
-                    is(true)); // index 4
+            assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId1, 
SLOT_TIMEOUT))
+                    .isTrue(); // index 0
+            assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId2, 
SLOT_TIMEOUT))
+                    .isTrue(); // index 3
+            assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId3, 
SLOT_TIMEOUT))
+                    .isTrue(); // index 4
 
-            assertThat(taskSlotTable.freeSlot(allocationId2), is(3));
+            assertThat(taskSlotTable.freeSlot(allocationId2)).isEqualTo(3);
 
             ResourceID resourceId = ResourceID.generate();
             SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
             List<SlotStatus> slotStatuses = new ArrayList<>();
             slotReport.iterator().forEachRemaining(slotStatuses::add);
 
-            assertThat(slotStatuses.size(), is(4));
-            assertThat(
-                    slotStatuses,
-                    containsInAnyOrder(
-                            is(
-                                    new SlotStatus(
-                                            new SlotID(resourceId, 0),
-                                            
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
-                                            jobId,
-                                            allocationId1)),
-                            is(
-                                    new SlotStatus(
-                                            new SlotID(resourceId, 1),
-                                            
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
-                                            null,
-                                            null)),
-                            is(
-                                    new SlotStatus(
-                                            new SlotID(resourceId, 2),
-                                            
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
-                                            null,
-                                            null)),
-                            is(
-                                    new SlotStatus(
-                                            new SlotID(resourceId, 4),
-                                            
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
-                                            jobId,
-                                            allocationId3))));
+            assertThat(slotStatuses.size()).isEqualTo(4);

Review Comment:
   ```suggestion
               assertThat(slotStatuses).hasSize(4);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -427,35 +411,37 @@ public void testRemoveTaskCallsFreeSlotAction() throws 
Exception {
             // to enable that the last remaining finished task does the final 
slot freeing
             taskSlotTable.freeSlot(allocationId);
             taskSlotTable.removeTask(executionAttemptId);
-            assertThat(freeSlotFuture.get(), is(allocationId));
+            assertThat(freeSlotFuture.get()).isEqualTo(allocationId);

Review Comment:
   ```suggestion
               assertThat(freeSlotFuture).isCompletedWithValue(allocationId);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -467,20 +453,21 @@ public void testAllocatedSlotTimeout() throws Exception {
                 createTaskSlotTableAndStart(1, testingSlotActions)) {
             final AllocationID allocationId = new AllocationID();
             assertThat(
-                    taskSlotTable.allocateSlot(0, new JobID(), allocationId, 
Time.milliseconds(1L)),
-                    is(true));
-            assertThat(timeoutFuture.join(), is(allocationId));
+                            taskSlotTable.allocateSlot(
+                                    0, new JobID(), allocationId, 
Time.milliseconds(1L)))
+                    .isTrue();
+            assertThat(timeoutFuture.join()).isEqualTo(allocationId);

Review Comment:
   ```suggestion
               assertThat(timeoutFuture).isCompletedWithValue(allocationId);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java:
##########
@@ -427,35 +411,37 @@ public void testRemoveTaskCallsFreeSlotAction() throws 
Exception {
             // to enable that the last remaining finished task does the final 
slot freeing
             taskSlotTable.freeSlot(allocationId);
             taskSlotTable.removeTask(executionAttemptId);
-            assertThat(freeSlotFuture.get(), is(allocationId));
+            assertThat(freeSlotFuture.get()).isEqualTo(allocationId);
         }
     }
 
-    @Test(timeout = 10000)
-    public void testFreeSlotInterruptsSubmittedTask() throws Exception {
+    @Test
+    @Timeout(10)
+    void testFreeSlotInterruptsSubmittedTask() throws Exception {
         TestingTaskSlotPayload task = new TestingTaskSlotPayload();
         try (final TaskSlotTable<TaskSlotPayload> taskSlotTable =
                 createTaskSlotTableWithStartedTask(task)) {
-            assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+            
assertThat(taskSlotTable.freeSlot(task.getAllocationId())).isEqualTo(-1);
             task.waitForFailure();
             task.terminate();
         }
     }
 
-    @Test(timeout = 10000)
-    public void testTableIsClosedOnlyWhenAllTasksTerminated() throws Exception 
{
+    @Test
+    @Timeout(10)
+    void testTableIsClosedOnlyWhenAllTasksTerminated() throws Exception {
         TestingTaskSlotPayload task = new TestingTaskSlotPayload();
         final TaskSlotTable<TaskSlotPayload> taskSlotTable =
                 createTaskSlotTableWithStartedTask(task);
-        assertThat(taskSlotTable.freeSlot(task.getAllocationId()), is(-1));
+        
assertThat(taskSlotTable.freeSlot(task.getAllocationId())).isEqualTo(-1);
         CompletableFuture<Void> closingFuture = taskSlotTable.closeAsync();
-        assertThat(closingFuture.isDone(), is(false));
+        assertThat(closingFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
           assertThat(closingFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java:
##########
@@ -19,81 +19,75 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collection;
 import java.util.Collections;
 
-import static junit.framework.TestCase.assertNotNull;
-import static junit.framework.TestCase.assertTrue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link PartitionTable}. */
-public class PartitionTableTest extends TestLogger {
+class PartitionTableTest {
 
     private static final JobID JOB_ID = new JobID();
     private static final ResultPartitionID PARTITION_ID = new 
ResultPartitionID();
 
     @Test
-    public void testEmptyTable() {
+    void testEmptyTable() {
         final PartitionTable<JobID> table = new PartitionTable<>();
 
         // an empty table should always return an empty collection
         Collection<ResultPartitionID> partitionsForNonExistingJob =
                 table.stopTrackingPartitions(JOB_ID);
-        assertNotNull(partitionsForNonExistingJob);
-        assertThat(partitionsForNonExistingJob, empty());
+        assertThat(partitionsForNonExistingJob).isNotNull();
+        assertThat(partitionsForNonExistingJob).isEmpty();

Review Comment:
   ```suggestion
           assertThat(partitionsForNonExistingJob).isNotNull().isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TimeUtils;
 
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.net.InetAddress;
+import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)
+class TaskManagerRunnerTest {
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
-    @Rule public final Timeout timeout = Timeout.seconds(30);
+    @TempDir private Path temporaryFolder;
 
     private TaskManagerRunner taskManagerRunner;
 
-    @After
-    public void after() throws Exception {
+    @AfterEach
+    void after() throws Exception {
         if (taskManagerRunner != null) {
             taskManagerRunner.close();
         }
     }
 
     @Test
-    public void testShouldShutdownOnFatalError() throws Exception {
+    void testShouldShutdownOnFatalError() throws Exception {
         Configuration configuration = createConfiguration();
         // very high timeout, to ensure that we don't fail because of 
registration timeouts
         configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, 
TimeUtils.parseDuration("42 h"));
         taskManagerRunner = createTaskManagerRunner(configuration);
 
         taskManagerRunner.onFatalError(new RuntimeException());
 
-        assertThat(
-                taskManagerRunner.getTerminationFuture().join(),
-                is(equalTo(TaskManagerRunner.Result.FAILURE)));
+        assertThat(taskManagerRunner.getTerminationFuture().join())
+                .isEqualTo(TaskManagerRunner.Result.FAILURE);
     }
 
     @Test
-    public void testShouldShutdownIfRegistrationWithJobManagerFails() throws 
Exception {
+    void testShouldShutdownIfRegistrationWithJobManagerFails() throws 
Exception {
         Configuration configuration = createConfiguration();
         configuration.set(
                 TaskManagerOptions.REGISTRATION_TIMEOUT, 
TimeUtils.parseDuration("10 ms"));
         taskManagerRunner = createTaskManagerRunner(configuration);
 
-        assertThat(
-                taskManagerRunner.getTerminationFuture().join(),
-                is(equalTo(TaskManagerRunner.Result.FAILURE)));
+        assertThat(taskManagerRunner.getTerminationFuture().join())
+                .isEqualTo(TaskManagerRunner.Result.FAILURE);
     }
 
     @Test
-    public void testGenerateTaskManagerResourceIDWithMetaData() throws 
Exception {
+    void testGenerateTaskManagerResourceIDWithMetaData() throws Exception {
         final Configuration configuration = createConfiguration();
         final String metadata = "test";
         
configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, 
metadata);
         final ResourceID taskManagerResourceID =
                 TaskManagerRunner.getTaskManagerResourceID(configuration, "", 
-1).unwrap();
 
-        assertThat(taskManagerResourceID.getMetadata(), equalTo(metadata));
+        assertThat(taskManagerResourceID.getMetadata()).isEqualTo(metadata);
     }
 
     @Test
-    public void testGenerateTaskManagerResourceIDWithoutMetaData() throws 
Exception {
+    void testGenerateTaskManagerResourceIDWithoutMetaData() throws Exception {
         final Configuration configuration = createConfiguration();
         final String resourceID = "test";
         configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, 
resourceID);
         final ResourceID taskManagerResourceID =
                 TaskManagerRunner.getTaskManagerResourceID(configuration, "", 
-1).unwrap();
 
-        assertThat(taskManagerResourceID.getMetadata(), equalTo(""));
-        assertThat(taskManagerResourceID.getStringWithMetadata(), 
equalTo("test"));
+        assertThat(taskManagerResourceID.getMetadata()).isEqualTo("");

Review Comment:
   ```suggestion
           assertThat(taskManagerResourceID.getMetadata()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -36,139 +36,127 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TimeUtils;
 
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.net.InetAddress;
+import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link TaskManagerRunner}. */
-public class TaskManagerRunnerTest extends TestLogger {
+@Timeout(30)
+class TaskManagerRunnerTest {
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
-    @Rule public final Timeout timeout = Timeout.seconds(30);
+    @TempDir private Path temporaryFolder;
 
     private TaskManagerRunner taskManagerRunner;
 
-    @After
-    public void after() throws Exception {
+    @AfterEach
+    void after() throws Exception {
         if (taskManagerRunner != null) {
             taskManagerRunner.close();
         }
     }
 
     @Test
-    public void testShouldShutdownOnFatalError() throws Exception {
+    void testShouldShutdownOnFatalError() throws Exception {
         Configuration configuration = createConfiguration();
         // very high timeout, to ensure that we don't fail because of 
registration timeouts
         configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, 
TimeUtils.parseDuration("42 h"));
         taskManagerRunner = createTaskManagerRunner(configuration);
 
         taskManagerRunner.onFatalError(new RuntimeException());
 
-        assertThat(
-                taskManagerRunner.getTerminationFuture().join(),
-                is(equalTo(TaskManagerRunner.Result.FAILURE)));
+        assertThat(taskManagerRunner.getTerminationFuture().join())
+                .isEqualTo(TaskManagerRunner.Result.FAILURE);

Review Comment:
   ```suggestion
       
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.FAILURE);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java:
##########
@@ -181,14 +169,12 @@ public void 
testUnexpectedTaskManagerTerminationFailsRunnerFatally() throws Exce
 
         terminationFuture.complete(null);
 
-        assertThat(
-                taskManagerRunner.getTerminationFuture().join(),
-                is(equalTo(TaskManagerRunner.Result.FAILURE)));
+        assertThat(taskManagerRunner.getTerminationFuture().join())

Review Comment:
   ```suggestion
       
assertThat(taskManagerRunner.getTerminationFuture()).isCompletedWithValue(TaskManagerRunner.Result.FAILURE);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -1708,9 +1694,9 @@ public void testRemoveJobFromJobLeaderService() throws 
Exception {
             final SlotID slotId = buildSlotID(0);
             final AllocationID allocationId = new AllocationID();
 
-            assertThat(startFuture.isDone(), is(false));
+            assertThat(startFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
               assertThat(startFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -893,7 +880,7 @@ public void testTaskSlotTableTerminationOnShutdown() throws 
Exception {
         }
 
         // check task executor is waiting for the task completion and has not 
terminated yet
-        assertThat(taskExecutorTerminationFuture.isDone(), is(false));
+        assertThat(taskExecutorTerminationFuture.isDone()).isFalse();

Review Comment:
   ```suggestion
           assertThat(taskExecutorTerminationFuture).isNotDone();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -1871,15 +1852,11 @@ public void testIgnoringSlotRequestsIfNotRegistered() 
throws Exception {
                             testingResourceManagerGateway.getFencingToken(),
                             timeout);
 
-            try {
-                slotRequestResponse.get();
-                fail(
-                        "We should not be able to request slots before the 
TaskExecutor is registered at the ResourceManager.");
-            } catch (ExecutionException ee) {
-                assertThat(
-                        ExceptionUtils.stripExecutionException(ee),
-                        instanceOf(TaskManagerException.class));
-            }
+            assertThatThrownBy(slotRequestResponse::get)
+                    .withFailMessage(
+                            "We should not be able to request slots before the 
TaskExecutor is registered at the ResourceManager.")
+                    .isInstanceOf(ExecutionException.class)
+                    .hasCauseInstanceOf(TaskManagerException.class);

Review Comment:
   ```suggestion
               FlinkAssertions.assertThatFuture(slotRequestResponse)
                       .withFailMessage(
                               "We should not be able to request slots before 
the TaskExecutor is registered at the ResourceManager.")
                       .eventuallyFailsWith(ExecutionException.class)
                       .withCauseInstanceOf(TaskManagerException.class);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -200,13 +189,11 @@ public class TaskExecutorTest extends TestLogger {
                     MemorySize.parse("4m"),
                     Collections.emptyList());
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
-
-    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
+            TestingUtils.defaultExecutorExtension();
 
-    @Rule public final TestName testName = new TestName();
+    @TempDir private Path tmp;

Review Comment:
   How about renaming it to `tmpDir`? The tmp is ambiguous.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -589,21 +576,21 @@ private void runResourceManagerHeartbeatTest(
 
             // heartbeat timeout should trigger disconnect TaskManager from 
ResourceManager
             assertThat(
-                    taskExecutorDisconnectFuture.get(
-                            timeout.toMilliseconds(), TimeUnit.MILLISECONDS),
-                    equalTo(unresolvedTaskManagerLocation.getResourceID()));
+                            taskExecutorDisconnectFuture.get(
+                                    timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS))
+                    .isEqualTo(unresolvedTaskManagerLocation.getResourceID());

Review Comment:
   ```suggestion
                               taskExecutorDisconnectFuture)
                               .succeedsWithin(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS))
                       
.isEqualTo(unresolvedTaskManagerLocation.getResourceID());
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -1980,14 +1955,14 @@ public void testInitialSlotReport() throws Exception {
                     testingResourceManagerGateway.getAddress(),
                     testingResourceManagerGateway.getFencingToken().toUUID());
 
-            assertThat(initialSlotReportFuture.get(), 
equalTo(taskExecutor.getResourceID()));
+            
assertThat(initialSlotReportFuture.get()).isEqualTo(taskExecutor.getResourceID());

Review Comment:
   ```suggestion
               
assertThat(initialSlotReportFuture).isCompletedWithValue(taskExecutor.getResourceID());
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -683,10 +670,9 @@ public void testHeartbeatReporting() throws Exception {
 
             // register resource manager success will trigger monitoring 
heartbeat target between tm
             // and rm
-            assertThat(
-                    taskExecutorRegistrationFuture.get(),
-                    equalTo(unresolvedTaskManagerLocation.getResourceID()));
-            assertThat(initialSlotReportFuture.get(), equalTo(slotReport1));
+            assertThat(taskExecutorRegistrationFuture.get())
+                    .isEqualTo(unresolvedTaskManagerLocation.getResourceID());
+            assertThat(initialSlotReportFuture.get()).isEqualTo(slotReport1);

Review Comment:
   ```suggestion
               assertThat(taskExecutorRegistrationFuture)
                       
.isCompletedWithValue(unresolvedTaskManagerLocation.getResourceID());
               
assertThat(initialSlotReportFuture).isCompletedWithValue(slotReport1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2162,21 +2136,21 @@ public void testOfferSlotToJobMasterAfterTimeout() 
throws Exception {
 
             slotOfferings.await();
 
-            assertThat(offeredSlotFuture.get(), is(allocationId));
-            assertTrue(taskSlotTable.isSlotFree(1));
+            assertThat(offeredSlotFuture.get()).isEqualTo(allocationId);

Review Comment:
   ```suggestion
               assertThat(offeredSlotFuture).isCompletedWithValue(allocationId);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2687,23 +2657,23 @@ public void 
testReleaseOfJobResourcesIfJobMasterIsNotCorrect() throws Exception
                     jobMasterGateway.getAddress(), 
jobMasterGateway.getFencingToken().toUUID());
 
             // the slot should be freed
-            assertThat(availableSlotFuture.get().f1, is(slotId));
-            assertThat(availableSlotFuture.get().f2, is(allocationId));
+            assertThat(availableSlotFuture.get().f1).isEqualTo(slotId);
+            assertThat(availableSlotFuture.get().f2).isEqualTo(allocationId);
 
             // all job partitions should be released
-            assertThat(jobPartitionsReleaseFuture.get(), is(jobId));
+            assertThat(jobPartitionsReleaseFuture.get()).isEqualTo(jobId);

Review Comment:
   ```suggestion
               
assertThat(jobPartitionsReleaseFuture).isCompletedWithValue(jobId);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -3086,13 +3058,13 @@ public void testSharedResourcesLifecycle() throws 
Exception {
                 ctx.taskExecutor.cancelTask(executions.get(i), timeout).get();
                 waitForTasks(ctx, numTasks -> numTasks > numRemaining);
                 if (numRemaining > 0) {
-                    assertEquals(0, 
SharedResourceCollectingInvokable.timesDeallocated.get());
+                    
assertThat(SharedResourceCollectingInvokable.timesDeallocated.get()).isZero();

Review Comment:
   ```suggestion
                       
assertThat(SharedResourceCollectingInvokable.timesDeallocated).hasValue(0);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java:
##########
@@ -2234,19 +2208,20 @@ public void testDisconnectFromJobMasterWhenNewLeader() 
throws Exception {
             jobManagerLeaderRetriever.notifyListener(
                     jobMasterGateway.getAddress(), UUID.randomUUID());
 
-            assertThat(offeredSlotsFuture.get(), is(1));
+            assertThat(offeredSlotsFuture.get()).isOne();

Review Comment:
   ```suggestion
               assertThat(offeredSlotsFuture).isCompletedWithValue(1);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to