This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 64c725f4a6e [FLINK-32837][JUnit5 Migration] Migrate the client and 
clusterframework packages of flink-runtime module to junit5 (#23241)
64c725f4a6e is described below

commit 64c725f4a6ed0073a0559f4b2eadaf909608abf2
Author: Rui Fan <[email protected]>
AuthorDate: Wed Aug 23 21:51:47 2023 +0800

    [FLINK-32837][JUnit5 Migration] Migrate the client and clusterframework 
packages of flink-runtime module to junit5 (#23241)
---
 .../flink/runtime/client/ClientUtilsTest.java      | 100 ++-
 .../client/SerializedJobExecutionResultTest.java   |  74 +-
 .../clusterframework/ApplicationStatusTest.java    |  27 +-
 .../clusterframework/BootstrapToolsTest.java       | 844 ++++++++++-----------
 .../TaskExecutorProcessUtilsTest.java              | 357 ++++-----
 .../types/ResourceBudgetManagerTest.java           |  34 +-
 .../types/ResourceProfileTest.java                 | 235 +++---
 .../jobmanager/JobManagerProcessUtilsTest.java     | 173 ++---
 8 files changed, 890 insertions(+), 954 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
index 78a25aff5f1..6d600a112a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -28,47 +28,49 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ClientUtils}. */
-public class ClientUtilsTest extends TestLogger {
+public class ClientUtilsTest {
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+    @TempDir private static java.nio.file.Path temporaryFolder;
 
     private static BlobServer blobServer = null;
 
-    @BeforeClass
-    public static void setup() throws IOException {
+    @BeforeAll
+    static void setup() throws IOException {
         Configuration config = new Configuration();
-        blobServer = new BlobServer(config, temporaryFolder.newFolder(), new 
VoidBlobStore());
+        blobServer =
+                new BlobServer(
+                        config, TempDirUtils.newFolder(temporaryFolder), new 
VoidBlobStore());
         blobServer.start();
     }
 
-    @AfterClass
-    public static void teardown() throws IOException {
+    @AfterAll
+    static void teardown() throws IOException {
         if (blobServer != null) {
             blobServer.close();
         }
     }
 
     @Test
-    public void uploadAndSetUserJars() throws Exception {
-        java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+    void uploadAndSetUserJars() throws Exception {
+        java.nio.file.Path tmpDir = 
TempDirUtils.newFolder(temporaryFolder).toPath();
         JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
 
         Collection<Path> jars =
@@ -78,8 +80,8 @@ public class ClientUtilsTest extends TestLogger {
 
         jars.forEach(jobGraph::addJar);
 
-        assertEquals(jars.size(), jobGraph.getUserJars().size());
-        assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+        assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
+        assertThat(jobGraph.getUserJarBlobKeys()).isEmpty();
 
         ClientUtils.extractAndUploadJobGraphFiles(
                 jobGraph,
@@ -88,9 +90,9 @@ public class ClientUtilsTest extends TestLogger {
                                 new InetSocketAddress("localhost", 
blobServer.getPort()),
                                 new Configuration()));
 
-        assertEquals(jars.size(), jobGraph.getUserJars().size());
-        assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
-        assertEquals(jars.size(), 
jobGraph.getUserJarBlobKeys().stream().distinct().count());
+        assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
+        assertThat(jobGraph.getUserJarBlobKeys()).hasSameSizeAs(jars);
+        
assertThat(jobGraph.getUserJarBlobKeys().stream().distinct()).hasSameSizeAs(jars);
 
         for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) {
             blobServer.getFile(jobGraph.getJobID(), blobKey);
@@ -98,8 +100,8 @@ public class ClientUtilsTest extends TestLogger {
     }
 
     @Test
-    public void uploadAndSetUserArtifacts() throws Exception {
-        java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+    void uploadAndSetUserArtifacts() throws Exception {
+        java.nio.file.Path tmpDir = 
TempDirUtils.newFolder(temporaryFolder).toPath();
         JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
 
         Collection<DistributedCache.DistributedCacheEntry> localArtifacts =
@@ -114,7 +116,7 @@ public class ClientUtilsTest extends TestLogger {
                                 
Files.createFile(tmpDir.resolve("art4")).toString(), true, false));
 
         Collection<DistributedCache.DistributedCacheEntry> 
distributedArtifacts =
-                Arrays.asList(
+                Collections.singletonList(
                         new DistributedCache.DistributedCacheEntry(
                                 "hdfs://localhost:1234/test", true, false));
 
@@ -127,12 +129,11 @@ public class ClientUtilsTest extends TestLogger {
 
         final int totalNumArtifacts = localArtifacts.size() + 
distributedArtifacts.size();
 
-        assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
-        assertEquals(
-                0,
-                jobGraph.getUserArtifacts().values().stream()
-                        .filter(entry -> entry.blobKey != null)
-                        .count());
+        assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
+        assertThat(
+                        jobGraph.getUserArtifacts().values().stream()
+                                .filter(entry -> entry.blobKey != null))
+                .isEmpty();
 
         ClientUtils.extractAndUploadJobGraphFiles(
                 jobGraph,
@@ -141,24 +142,21 @@ public class ClientUtilsTest extends TestLogger {
                                 new InetSocketAddress("localhost", 
blobServer.getPort()),
                                 new Configuration()));
 
-        assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
-        assertEquals(
-                localArtifacts.size(),
-                jobGraph.getUserArtifacts().values().stream()
-                        .filter(entry -> entry.blobKey != null)
-                        .count());
-        assertEquals(
-                distributedArtifacts.size(),
-                jobGraph.getUserArtifacts().values().stream()
-                        .filter(entry -> entry.blobKey == null)
-                        .count());
+        assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
+        assertThat(
+                        jobGraph.getUserArtifacts().values().stream()
+                                .filter(entry -> entry.blobKey != null))
+                .hasSameSizeAs(localArtifacts);
+        assertThat(
+                        jobGraph.getUserArtifacts().values().stream()
+                                .filter(entry -> entry.blobKey == null))
+                .hasSameSizeAs(distributedArtifacts);
         // 1 unique key for each local artifact, and null for distributed 
artifacts
-        assertEquals(
-                localArtifacts.size() + 1,
-                jobGraph.getUserArtifacts().values().stream()
-                        .map(entry -> entry.blobKey)
-                        .distinct()
-                        .count());
+        assertThat(
+                        jobGraph.getUserArtifacts().values().stream()
+                                .map(entry -> entry.blobKey)
+                                .distinct())
+                .hasSize(localArtifacts.size() + 1);
         for (DistributedCache.DistributedCacheEntry original : localArtifacts) 
{
             assertState(
                     original,
@@ -181,10 +179,10 @@ public class ClientUtilsTest extends TestLogger {
             boolean isBlobKeyNull,
             JobID jobId)
             throws Exception {
-        assertEquals(original.isZipped, actual.isZipped);
-        assertEquals(original.isExecutable, actual.isExecutable);
-        assertEquals(original.filePath, actual.filePath);
-        assertEquals(isBlobKeyNull, actual.blobKey == null);
+        assertThat(actual.isZipped).isEqualTo(original.isZipped);
+        assertThat(actual.isExecutable).isEqualTo(original.isExecutable);
+        assertThat(actual.filePath).isEqualTo(original.filePath);
+        assertThat(actual.blobKey == null).isEqualTo(isBlobKeyNull);
         if (!isBlobKeyNull) {
             blobServer.getFile(
                     jobId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
index 83101dba951..4188b19e399 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
@@ -22,28 +22,24 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the SerializedJobExecutionResult */
-public class SerializedJobExecutionResultTest extends TestLogger {
+class SerializedJobExecutionResultTest {
 
     @Test
-    public void testSerialization() throws Exception {
+    void testSerialization() throws Exception {
         final ClassLoader classloader = getClass().getClassLoader();
 
         JobID origJobId = new JobID();
@@ -62,61 +58,53 @@ public class SerializedJobExecutionResultTest extends 
TestLogger {
         // serialize and deserialize the object
         SerializedJobExecutionResult cloned = 
CommonTestUtils.createCopySerializable(result);
 
-        assertEquals(origJobId, cloned.getJobId());
-        assertEquals(origTime, cloned.getNetRuntime());
-        assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS));
-        assertEquals(origMap, cloned.getSerializedAccumulatorResults());
+        assertThat(cloned.getJobId()).isEqualTo(origJobId);
+        assertThat(cloned.getNetRuntime()).isEqualTo(origTime);
+        
assertThat(cloned.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
+        
assertThat(cloned.getSerializedAccumulatorResults()).isEqualTo(origMap);
 
         // convert to deserialized result
         JobExecutionResult jResult = result.toJobExecutionResult(classloader);
         JobExecutionResult jResultCopied = 
result.toJobExecutionResult(classloader);
 
-        assertEquals(origJobId, jResult.getJobID());
-        assertEquals(origJobId, jResultCopied.getJobID());
-        assertEquals(origTime, jResult.getNetRuntime());
-        assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS));
-        assertEquals(origTime, jResultCopied.getNetRuntime());
-        assertEquals(origTime, 
jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS));
+        assertThat(jResult.getJobID()).isEqualTo(origJobId);
+        assertThat(jResultCopied.getJobID()).isEqualTo(origJobId);
+        assertThat(jResult.getNetRuntime()).isEqualTo(origTime);
+        
assertThat(jResult.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
+        assertThat(jResultCopied.getNetRuntime()).isEqualTo(origTime);
+        
assertThat(jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
 
         for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry 
:
                 origMap.entrySet()) {
             String name = entry.getKey();
             OptionalFailure<Object> value = 
entry.getValue().deserializeValue(classloader);
             if (value.isFailure()) {
-                try {
-                    jResult.getAccumulatorResult(name);
-                    fail("expected failure");
-                } catch (FlinkRuntimeException ex) {
-                    assertTrue(
-                            ExceptionUtils.findThrowable(ex, 
ExpectedTestException.class)
-                                    .isPresent());
-                }
-                try {
-                    jResultCopied.getAccumulatorResult(name);
-                    fail("expected failure");
-                } catch (FlinkRuntimeException ex) {
-                    assertTrue(
-                            ExceptionUtils.findThrowable(ex, 
ExpectedTestException.class)
-                                    .isPresent());
-                }
+                assertThatThrownBy(() -> jResult.getAccumulatorResult(name))
+                        .isInstanceOf(FlinkRuntimeException.class)
+                        .hasCauseInstanceOf(ExpectedTestException.class);
+
+                assertThatThrownBy(() -> 
jResultCopied.getAccumulatorResult(name))
+                        .isInstanceOf(FlinkRuntimeException.class)
+                        .hasCauseInstanceOf(ExpectedTestException.class);
             } else {
-                assertEquals(value.get(), jResult.getAccumulatorResult(name));
-                assertEquals(value.get(), 
jResultCopied.getAccumulatorResult(name));
+                assertThat((Object) 
jResult.getAccumulatorResult(name)).isEqualTo(value.get());
+                assertThat((Object) jResultCopied.getAccumulatorResult(name))
+                        .isEqualTo(value.get());
             }
         }
     }
 
     @Test
-    public void testSerializationWithNullValues() throws Exception {
+    void testSerializationWithNullValues() throws Exception {
         SerializedJobExecutionResult result = new 
SerializedJobExecutionResult(null, 0L, null);
         SerializedJobExecutionResult cloned = 
CommonTestUtils.createCopySerializable(result);
 
-        assertNull(cloned.getJobId());
-        assertEquals(0L, cloned.getNetRuntime());
-        assertNull(cloned.getSerializedAccumulatorResults());
+        assertThat(cloned.getJobId()).isNull();
+        assertThat(cloned.getNetRuntime()).isZero();
+        assertThat(cloned.getSerializedAccumulatorResults()).isNull();
 
         JobExecutionResult jResult = 
result.toJobExecutionResult(getClass().getClassLoader());
-        assertNull(jResult.getJobID());
-        assertTrue(jResult.getAllAccumulatorResults().isEmpty());
+        assertThat(jResult.getJobID()).isNull();
+        assertThat(jResult.getAllAccumulatorResults()).isEmpty();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
index 6cc225d0001..507c0a4fe26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -34,64 +32,63 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link ApplicationStatus}. */
-@ExtendWith(TestLoggerExtension.class)
-public class ApplicationStatusTest {
+class ApplicationStatusTest {
 
     private static final int SUCCESS_EXIT_CODE = 0;
 
     @Test
-    public void succeededStatusMapsToSuccessExitCode() {
+    void succeededStatusMapsToSuccessExitCode() {
         int exitCode = ApplicationStatus.SUCCEEDED.processExitCode();
         assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
     }
 
     @Test
-    public void cancelledStatusMapsToSuccessExitCode() {
+    void cancelledStatusMapsToSuccessExitCode() {
         int exitCode = ApplicationStatus.CANCELED.processExitCode();
         assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
     }
 
     @Test
-    public void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
+    void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
         Iterable<Integer> exitCodes = 
exitCodes(notSucceededNorCancelledStatus());
         assertThat(exitCodes).doesNotContain(SUCCESS_EXIT_CODE);
     }
 
     @Test
-    public void testJobStatusFromSuccessApplicationStatus() {
+    void testJobStatusFromSuccessApplicationStatus() {
         
assertThat(ApplicationStatus.SUCCEEDED.deriveJobStatus()).isEqualTo(JobStatus.FINISHED);
     }
 
     @Test
-    public void testJobStatusFromFailedApplicationStatus() {
+    void testJobStatusFromFailedApplicationStatus() {
         
assertThat(ApplicationStatus.FAILED.deriveJobStatus()).isEqualTo(JobStatus.FAILED);
     }
 
     @Test
-    public void testJobStatusFromCancelledApplicationStatus() {
+    void testJobStatusFromCancelledApplicationStatus() {
         
assertThat(ApplicationStatus.CANCELED.deriveJobStatus()).isEqualTo(JobStatus.CANCELED);
     }
 
     @Test
-    public void testJobStatusFailsFromUnknownApplicationStatuses() {
+    void testJobStatusFailsFromUnknownApplicationStatuses() {
         assertThatThrownBy(ApplicationStatus.UNKNOWN::deriveJobStatus)
                 .isInstanceOf(UnsupportedOperationException.class);
     }
 
     @Test
-    public void testSuccessApplicationStatusFromJobStatus() {
+    void testSuccessApplicationStatusFromJobStatus() {
         assertThat(ApplicationStatus.fromJobStatus(JobStatus.FINISHED))
                 .isEqualTo(ApplicationStatus.SUCCEEDED);
     }
 
     @Test
-    public void testFailedApplicationStatusFromJobStatus() {
+    void testFailedApplicationStatusFromJobStatus() {
         assertThat(ApplicationStatus.fromJobStatus(JobStatus.FAILED))
                 .isEqualTo(ApplicationStatus.FAILED);
     }
 
     @Test
-    public void testCancelledApplicationStatusFromJobStatus() {
+    void testCancelledApplicationStatusFromJobStatus() {
         assertThat(ApplicationStatus.fromJobStatus(JobStatus.CANCELED))
                 .isEqualTo(ApplicationStatus.CANCELED);
     }
@@ -114,7 +111,7 @@ public class ApplicationStatusTest {
     }
 
     @Test
-    public void testUnknownApplicationStatusForMissingJobStatus() {
+    void testUnknownApplicationStatusForMissingJobStatus() {
         
assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 2012494d5bb..2315595e7c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -27,15 +27,11 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,21 +43,15 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link BootstrapTools}. */
-public class BootstrapToolsTest extends TestLogger {
+class BootstrapToolsTest {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(BootstrapToolsTest.class);
-
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private static java.nio.file.Path temporaryFolder;
 
     @Test
-    public void testSubstituteConfigKey() {
+    void testSubstituteConfigKey() {
         String deprecatedKey1 = "deprecated-key";
         String deprecatedKey2 = "another-out_of-date_key";
         String deprecatedKey3 = "yet-one-more";
@@ -85,18 +75,18 @@ public class BootstrapToolsTest extends TestLogger {
         BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey3, 
designatedKey3);
 
         // value 1 should be set to designated
-        assertEquals(value1, cfg.getString(designatedKey1, null));
+        assertThat(cfg.getString(designatedKey1, null)).isEqualTo(value1);
 
         // value 2 should not have been set, since it had a value already
-        assertEquals(value2Designated, cfg.getString(designatedKey2, null));
+        assertThat(cfg.getString(designatedKey2, 
null)).isEqualTo(value2Designated);
 
         // nothing should be in there for key 3
-        assertNull(cfg.getString(designatedKey3, null));
-        assertNull(cfg.getString(deprecatedKey3, null));
+        assertThat(cfg.getString(designatedKey3, null)).isNull();
+        assertThat(cfg.getString(deprecatedKey3, null)).isNull();
     }
 
     @Test
-    public void testSubstituteConfigKeyPrefix() {
+    void testSubstituteConfigKeyPrefix() {
         String deprecatedPrefix1 = "deprecated-prefix";
         String deprecatedPrefix2 = "-prefix-2";
         String deprecatedPrefix3 = "prefix-3";
@@ -129,19 +119,19 @@ public class BootstrapToolsTest extends TestLogger {
         BootstrapTools.substituteDeprecatedConfigPrefix(cfg, 
deprecatedPrefix2, designatedPrefix2);
         BootstrapTools.substituteDeprecatedConfigPrefix(cfg, 
deprecatedPrefix3, designatedPrefix3);
 
-        assertEquals(val1, cfg.getString(desig1, null));
-        assertEquals(val2, cfg.getString(desig2, null));
-        assertEquals(val3Desig, cfg.getString(desig3, null));
+        assertThat(cfg.getString(desig1, null)).isEqualTo(val1);
+        assertThat(cfg.getString(desig2, null)).isEqualTo(val2);
+        assertThat(cfg.getString(desig3, null)).isEqualTo(val3Desig);
 
         // check that nothing with prefix 3 is contained
         for (String key : cfg.keySet()) {
-            assertFalse(key.startsWith(designatedPrefix3));
-            assertFalse(key.startsWith(deprecatedPrefix3));
+            assertThat(key.startsWith(designatedPrefix3)).isFalse();
+            assertThat(key.startsWith(deprecatedPrefix3)).isFalse();
         }
     }
 
     @Test
-    public void testGetTaskManagerShellCommand() {
+    void testGetTaskManagerShellCommand() {
         final Configuration cfg = new Configuration();
         final TaskExecutorProcessSpec taskExecutorProcessSpec =
                 new TaskExecutorProcessSpec(
@@ -156,8 +146,7 @@ public class BootstrapToolsTest extends TestLogger {
                         new MemorySize(0), // jvmOverheadSize
                         Collections.emptyList());
         final ContaineredTaskManagerParameters containeredParams =
-                new ContaineredTaskManagerParameters(
-                        taskExecutorProcessSpec, new HashMap<String, 
String>());
+                new ContaineredTaskManagerParameters(taskExecutorProcessSpec, 
new HashMap<>());
 
         // no logging, with/out krb5
         final String java = "$JAVA_HOME/bin/java";
@@ -178,407 +167,423 @@ public class BootstrapToolsTest extends TestLogger {
         final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
         final String redirects = "1> ./logs/taskmanager.out 2> 
./logs/taskmanager.err";
 
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        mainClass,
-                        dynamicConfigs,
-                        basicArgs,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        false,
-                        false,
-                        false,
-                        this.getClass(),
-                        ""));
-
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        false,
-                        false,
-                        false,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                false,
+                                false,
+                                this.getClass(),
+                                ""))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                mainClass,
+                                dynamicConfigs,
+                                basicArgs,
+                                redirects));
+
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                false,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                mainClass,
+                                args,
+                                redirects));
 
         final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        false,
-                        false,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                false,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                mainClass,
+                                args,
+                                redirects));
 
         // logback only, with/out krb5
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        logfile,
-                        logback,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        false,
-                        false,
-                        this.getClass(),
-                        mainArgs));
-
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        logfile,
-                        logback,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        false,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                false,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                false,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                mainClass,
+                                args,
+                                redirects));
 
         // log4j, with/out krb5
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        logfile,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        false,
-                        true,
-                        false,
-                        this.getClass(),
-                        mainArgs));
-
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        logfile,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        false,
-                        true,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                false,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
 
         // logback + log4j, with/out krb5
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        logfile,
-                        logback,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        false,
-                        this.getClass(),
-                        mainArgs));
-
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        logfile,
-                        logback,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
 
         // logback + log4j, with/out krb5, different JVM opts
         cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        jvmOpts,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        logfile,
-                        logback,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        false,
-                        this.getClass(),
-                        mainArgs));
-
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        jvmOpts,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        logfile,
-                        logback,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
 
         // logback + log4j, with/out krb5, different JVM opts
         cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        jvmOpts,
-                        tmJvmOpts,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        logfile,
-                        logback,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        false,
-                        this.getClass(),
-                        mainArgs));
-
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        jvmmem,
-                        jvmOpts,
-                        tmJvmOpts,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        logfile,
-                        logback,
-                        log4j,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                false,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                tmJvmOpts,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
+
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                jvmmem,
+                                jvmOpts,
+                                tmJvmOpts,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                logfile,
+                                logback,
+                                log4j,
+                                mainClass,
+                                args,
+                                redirects));
 
         // now try some configurations with different 
yarn.container-start-command-template
 
         cfg.setString(
                 ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
                 "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 
6 %redirects%");
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        "1",
-                        jvmmem,
-                        "2",
-                        jvmOpts,
-                        tmJvmOpts,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        "3",
-                        logfile,
-                        logback,
-                        log4j,
-                        "4",
-                        mainClass,
-                        "5",
-                        args,
-                        "6",
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                "1",
+                                jvmmem,
+                                "2",
+                                jvmOpts,
+                                tmJvmOpts,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                "3",
+                                logfile,
+                                logback,
+                                log4j,
+                                "4",
+                                mainClass,
+                                "5",
+                                args,
+                                "6",
+                                redirects));
 
         cfg.setString(
                 ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
                 "%java% %logging% %jvmopts% %jvmmem% %class% %args% 
%redirects%");
-        assertEquals(
-                String.join(
-                        " ",
-                        java,
-                        logfile,
-                        logback,
-                        log4j,
-                        jvmOpts,
-                        tmJvmOpts,
-                        BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
-                        krb5,
-                        jvmmem,
-                        mainClass,
-                        args,
-                        redirects),
-                BootstrapTools.getTaskManagerShellCommand(
-                        cfg,
-                        containeredParams,
-                        "./conf",
-                        "./logs",
-                        true,
-                        true,
-                        true,
-                        this.getClass(),
-                        mainArgs));
+        assertThat(
+                        BootstrapTools.getTaskManagerShellCommand(
+                                cfg,
+                                containeredParams,
+                                "./conf",
+                                "./logs",
+                                true,
+                                true,
+                                true,
+                                this.getClass(),
+                                mainArgs))
+                .isEqualTo(
+                        String.join(
+                                " ",
+                                java,
+                                logfile,
+                                logback,
+                                log4j,
+                                jvmOpts,
+                                tmJvmOpts,
+                                BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+                                krb5,
+                                jvmmem,
+                                mainClass,
+                                args,
+                                redirects));
     }
 
     @Test
-    public void testUpdateTmpDirectoriesInConfiguration() {
+    void testUpdateTmpDirectoriesInConfiguration() {
         Configuration config = new Configuration();
 
         // test that default value is taken
         BootstrapTools.updateTmpDirectoriesInConfiguration(config, 
"default/directory/path");
-        assertEquals(config.getString(CoreOptions.TMP_DIRS), 
"default/directory/path");
+        
assertThat(config.getString(CoreOptions.TMP_DIRS)).isEqualTo("default/directory/path");
 
         // test that we ignore default value is value is set before
         BootstrapTools.updateTmpDirectoriesInConfiguration(config, 
"not/default/directory/path");
-        assertEquals(config.getString(CoreOptions.TMP_DIRS), 
"default/directory/path");
+        
assertThat(config.getString(CoreOptions.TMP_DIRS)).isEqualTo("default/directory/path");
 
         // test that empty value is not a magic string
         config.setString(CoreOptions.TMP_DIRS, "");
         BootstrapTools.updateTmpDirectoriesInConfiguration(config, 
"some/new/path");
-        assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
+        assertThat(config.getString(CoreOptions.TMP_DIRS)).isEmpty();
     }
 
     @Test
-    public void 
testShouldNotUpdateTmpDirectoriesInConfigurationIfNoValueConfigured() {
+    void testShouldNotUpdateTmpDirectoriesInConfigurationIfNoValueConfigured() 
{
         Configuration config = new Configuration();
         BootstrapTools.updateTmpDirectoriesInConfiguration(config, null);
-        assertEquals(config.getString(CoreOptions.TMP_DIRS), 
CoreOptions.TMP_DIRS.defaultValue());
+        assertThat(CoreOptions.TMP_DIRS.defaultValue())
+                .isEqualTo(config.getString(CoreOptions.TMP_DIRS));
     }
 
     @Test
-    public void testGetDynamicPropertiesAsString() {
+    void testGetDynamicPropertiesAsString() {
         final Configuration baseConfig = new Configuration();
         baseConfig.setString("key.a", "a");
         baseConfig.setString("key.b", "b1");
@@ -590,67 +595,67 @@ public class BootstrapToolsTest extends TestLogger {
         final String dynamicProperties =
                 BootstrapTools.getDynamicPropertiesAsString(baseConfig, 
targetConfig);
         if (OperatingSystem.isWindows()) {
-            assertEquals("-Dkey.b=\"b2\" -Dkey.c=\"c\"", dynamicProperties);
+            assertThat(dynamicProperties).isEqualTo("-Dkey.b=\"b2\" 
-Dkey.c=\"c\"");
         } else {
-            assertEquals("-Dkey.b='b2' -Dkey.c='c'", dynamicProperties);
+            assertThat(dynamicProperties).isEqualTo("-Dkey.b='b2' 
-Dkey.c='c'");
         }
     }
 
     @Test
-    public void testEscapeDynamicPropertyValueWithSingleQuote() {
+    void testEscapeDynamicPropertyValueWithSingleQuote() {
         final String value1 = "#a,b&c^d*e@f(g!h";
-        assertEquals("'" + value1 + "'", 
BootstrapTools.escapeWithSingleQuote(value1));
+        assertThat(BootstrapTools.escapeWithSingleQuote(value1)).isEqualTo("'" 
+ value1 + "'");
 
         final String value2 = "'foobar";
-        assertEquals("''\\''foobar'", 
BootstrapTools.escapeWithSingleQuote(value2));
+        
assertThat(BootstrapTools.escapeWithSingleQuote(value2)).isEqualTo("''\\''foobar'");
 
         final String value3 = "foo''bar";
-        assertEquals("'foo'\\'''\\''bar'", 
BootstrapTools.escapeWithSingleQuote(value3));
+        
assertThat(BootstrapTools.escapeWithSingleQuote(value3)).isEqualTo("'foo'\\'''\\''bar'");
 
         final String value4 = "'foo' 'bar'";
-        assertEquals("''\\''foo'\\'' '\\''bar'\\'''", 
BootstrapTools.escapeWithSingleQuote(value4));
+        assertThat(BootstrapTools.escapeWithSingleQuote(value4))
+                .isEqualTo("''\\''foo'\\'' '\\''bar'\\'''");
     }
 
     @Test
-    public void testEscapeDynamicPropertyValueWithDoubleQuote() {
+    void testEscapeDynamicPropertyValueWithDoubleQuote() {
         final String value1 = "#a,b&c^d*e@f(g!h";
-        assertEquals("\"#a,b&c\"^^\"d*e@f(g!h\"", 
BootstrapTools.escapeWithDoubleQuote(value1));
+        assertThat(BootstrapTools.escapeWithDoubleQuote(value1))
+                .isEqualTo("\"#a,b&c\"^^\"d*e@f(g!h\"");
 
         final String value2 = "foo\"bar'";
-        assertEquals("\"foo\\\"bar'\"", 
BootstrapTools.escapeWithDoubleQuote(value2));
+        
assertThat(BootstrapTools.escapeWithDoubleQuote(value2)).isEqualTo("\"foo\\\"bar'\"");
 
         final String value3 = "\"foo\" \"bar\"";
-        assertEquals("\"\\\"foo\\\" \\\"bar\\\"\"", 
BootstrapTools.escapeWithDoubleQuote(value3));
+        assertThat(BootstrapTools.escapeWithDoubleQuote(value3))
+                .isEqualTo("\"\\\"foo\\\" \\\"bar\\\"\"");
     }
 
     @Test
-    public void testGetEnvironmentVariables() {
+    void testGetEnvironmentVariables() {
         Configuration testConf = new Configuration();
         testConf.setString("containerized.master.env.LD_LIBRARY_PATH", 
"/usr/lib/native");
 
         Map<String, String> res =
                 
ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", 
testConf);
 
-        Assert.assertEquals(1, res.size());
-        Map.Entry<String, String> entry = res.entrySet().iterator().next();
-        Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
-        Assert.assertEquals("/usr/lib/native", entry.getValue());
+        assertThat(res).hasSize(1).containsEntry("LD_LIBRARY_PATH", 
"/usr/lib/native");
     }
 
     @Test
-    public void testGetEnvironmentVariablesErroneous() {
+    void testGetEnvironmentVariablesErroneous() {
         Configuration testConf = new Configuration();
         testConf.setString("containerized.master.env.", "/usr/lib/native");
 
         Map<String, String> res =
                 
ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", 
testConf);
 
-        Assert.assertEquals(0, res.size());
+        assertThat(res).isEmpty();
     }
 
     @Test
-    public void testWriteConfigurationAndReload() throws IOException {
-        final File flinkConfDir = 
temporaryFolder.newFolder().getAbsoluteFile();
+    void testWriteConfigurationAndReload() throws IOException {
+        final File flinkConfDir = 
TempDirUtils.newFolder(temporaryFolder).getAbsoluteFile();
         final Configuration flinkConfig = new Configuration();
 
         final ConfigOption<List<String>> listStringConfigOption =
@@ -658,7 +663,8 @@ public class BootstrapToolsTest extends TestLogger {
         final List<String> list =
                 Arrays.asList("A,B,C,D", "A'B'C'D", "A;BCD", "AB\"C\"D", 
"AB'\"D:B");
         flinkConfig.set(listStringConfigOption, list);
-        assertThat(flinkConfig.get(listStringConfigOption), 
containsInAnyOrder(list.toArray()));
+        assertThat(flinkConfig.get(listStringConfigOption))
+                .containsExactlyInAnyOrderElementsOf(list);
 
         final ConfigOption<List<Duration>> listDurationConfigOption =
                 ConfigOptions.key("test-list-duration-key")
@@ -668,9 +674,8 @@ public class BootstrapToolsTest extends TestLogger {
         final List<Duration> durationList =
                 Arrays.asList(Duration.ofSeconds(3), Duration.ofMinutes(1));
         flinkConfig.set(listDurationConfigOption, durationList);
-        assertThat(
-                flinkConfig.get(listDurationConfigOption),
-                containsInAnyOrder(durationList.toArray()));
+        assertThat(flinkConfig.get(listDurationConfigOption))
+                .containsExactlyInAnyOrderElementsOf(durationList);
 
         final ConfigOption<Map<String, String>> mapConfigOption =
                 ConfigOptions.key("test-map-key").mapType().noDefaultValue();
@@ -681,27 +686,22 @@ public class BootstrapToolsTest extends TestLogger {
         map.put("key4", "AB\"C\"D");
         map.put("key5", "AB'\"D:B");
         flinkConfig.set(mapConfigOption, map);
-        assertThat(
-                flinkConfig.get(mapConfigOption).entrySet(),
-                containsInAnyOrder(map.entrySet().toArray()));
+        assertThat(flinkConfig.get(mapConfigOption)).containsAllEntriesOf(map);
 
         final ConfigOption<Duration> durationConfigOption =
                 
ConfigOptions.key("test-duration-key").durationType().noDefaultValue();
         final Duration duration = Duration.ofMillis(3000);
         flinkConfig.set(durationConfigOption, duration);
-        assertEquals(duration, flinkConfig.get(durationConfigOption));
+        assertThat(flinkConfig.get(durationConfigOption)).isEqualTo(duration);
 
         BootstrapTools.writeConfiguration(flinkConfig, new File(flinkConfDir, 
FLINK_CONF_FILENAME));
         final Configuration loadedFlinkConfig =
                 
GlobalConfiguration.loadConfiguration(flinkConfDir.getAbsolutePath());
-        assertThat(
-                loadedFlinkConfig.get(listStringConfigOption), 
containsInAnyOrder(list.toArray()));
-        assertThat(
-                loadedFlinkConfig.get(listDurationConfigOption),
-                containsInAnyOrder(durationList.toArray()));
-        assertThat(
-                loadedFlinkConfig.get(mapConfigOption).entrySet(),
-                containsInAnyOrder(map.entrySet().toArray()));
-        assertEquals(duration, loadedFlinkConfig.get(durationConfigOption));
+        assertThat(loadedFlinkConfig.get(listStringConfigOption))
+                .containsExactlyInAnyOrderElementsOf(list);
+        assertThat(loadedFlinkConfig.get(listDurationConfigOption))
+                .containsExactlyInAnyOrderElementsOf(durationList);
+        
assertThat(loadedFlinkConfig.get(mapConfigOption)).containsAllEntriesOf(map);
+        
assertThat(loadedFlinkConfig.get(durationConfigOption)).isEqualTo(duration);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
index 3ea74f6a47f..377cbf02180 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtilsTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Map;
@@ -39,17 +39,12 @@ import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_LEGACY_HEAP_OPTIONS;
 import static 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link TaskExecutorProcessUtils}. */
-public class TaskExecutorProcessUtilsTest
-        extends ProcessMemoryUtilsTestBase<TaskExecutorProcessSpec> {
+class TaskExecutorProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<TaskExecutorProcessSpec> {
 
     private static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
     private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
@@ -81,77 +76,72 @@ public class TaskExecutorProcessUtilsTest
     }
 
     @Test
-    public void testGenerateDynamicConfigurations() {
+    void testGenerateDynamicConfigurations() {
         String dynamicConfigsStr =
                 
TaskExecutorProcessUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
         Map<String, String> configs =
                 
ConfigurationUtils.parseTmResourceDynamicConfigs(dynamicConfigsStr);
 
         assertThat(
-                new 
CPUResource(Double.valueOf(configs.get(TaskManagerOptions.CPU_CORES.key()))),
-                is(TM_RESOURCE_SPEC.getCpuCores()));
+                        new CPUResource(
+                                Double.parseDouble(
+                                        
configs.get(TaskManagerOptions.CPU_CORES.key()))))
+                .isEqualTo(TM_RESOURCE_SPEC.getCpuCores());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getFrameworkHeapSize());
         assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
-                is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+                        MemorySize.parse(
+                                
configs.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getFrameworkOffHeapMemorySize());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getTaskHeapSize());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getTaskOffHeapSize());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MAX.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getNetworkMemSize());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MIN.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getNetworkMemSize());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getManagedMemorySize());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.JVM_METASPACE.key())))
+                
.isEqualTo(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getMetaspace());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MIN.key())))
+                
.isEqualTo(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead());
+        
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MAX.key())))
+                
.isEqualTo(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead());
+        
assertThat(Integer.valueOf(configs.get(TaskManagerOptions.NUM_TASK_SLOTS.key())))
+                .isEqualTo(TM_RESOURCE_SPEC.getNumSlots());
+        
assertThat(configs.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key()))
+                .isEqualTo(
+                        '"'
+                                + String.join(";", 
TM_RESOURCE_SPEC.getExtendedResources().keySet())
+                                + '"');
         assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key())),
-                is(TM_RESOURCE_SPEC.getFrameworkOffHeapMemorySize()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
-                is(TM_RESOURCE_SPEC.getTaskHeapSize()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
-                is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MAX.key())),
-                is(TM_RESOURCE_SPEC.getNetworkMemSize()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MIN.key())),
-                is(TM_RESOURCE_SPEC.getNetworkMemSize()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
-                is(TM_RESOURCE_SPEC.getManagedMemorySize()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.JVM_METASPACE.key())),
-                
is(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getMetaspace()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MIN.key())),
-                
is(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead()));
-        assertThat(
-                
MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MAX.key())),
-                
is(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead()));
-        assertThat(
-                
Integer.valueOf(configs.get(TaskManagerOptions.NUM_TASK_SLOTS.key())),
-                is(TM_RESOURCE_SPEC.getNumSlots()));
-        assertThat(
-                
configs.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key()),
-                is('"' + String.join(";", 
TM_RESOURCE_SPEC.getExtendedResources().keySet()) + '"'));
-        assertThat(
-                configs.get(
-                        
ExternalResourceOptions.getAmountConfigOptionForResource(
-                                EXTERNAL_RESOURCE_NAME_1)),
-                is(
+                        configs.get(
+                                
ExternalResourceOptions.getAmountConfigOptionForResource(
+                                        EXTERNAL_RESOURCE_NAME_1)))
+                .isEqualTo(
                         String.valueOf(
                                 TM_RESOURCE_SPEC
                                         .getExtendedResources()
                                         .get(EXTERNAL_RESOURCE_NAME_1)
                                         .getValue()
-                                        .longValue())));
+                                        .longValue()));
         assertThat(
-                configs.get(
-                        
ExternalResourceOptions.getAmountConfigOptionForResource(
-                                EXTERNAL_RESOURCE_NAME_2)),
-                is(
+                        configs.get(
+                                
ExternalResourceOptions.getAmountConfigOptionForResource(
+                                        EXTERNAL_RESOURCE_NAME_2)))
+                .isEqualTo(
                         String.valueOf(
                                 TM_RESOURCE_SPEC
                                         .getExtendedResources()
                                         .get(EXTERNAL_RESOURCE_NAME_2)
                                         .getValue()
-                                        .longValue())));
+                                        .longValue()));
     }
 
     @Test
-    public void testProcessSpecFromWorkerResourceSpec() {
+    void testProcessSpecFromWorkerResourceSpec() {
         final WorkerResourceSpec workerResourceSpec =
                 new WorkerResourceSpec.Builder()
                         .setCpuCores(1.0)
@@ -165,26 +155,24 @@ public class TaskExecutorProcessUtilsTest
         final TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(
                         new Configuration(), workerResourceSpec);
-        assertEquals(workerResourceSpec.getCpuCores(), 
taskExecutorProcessSpec.getCpuCores());
-        assertEquals(
-                workerResourceSpec.getTaskHeapSize(), 
taskExecutorProcessSpec.getTaskHeapSize());
-        assertEquals(
-                workerResourceSpec.getTaskOffHeapSize(),
-                taskExecutorProcessSpec.getTaskOffHeapSize());
-        assertEquals(
-                workerResourceSpec.getNetworkMemSize(),
-                taskExecutorProcessSpec.getNetworkMemSize());
-        assertEquals(
-                workerResourceSpec.getManagedMemSize(),
-                taskExecutorProcessSpec.getManagedMemorySize());
-        assertEquals(workerResourceSpec.getNumSlots(), 
taskExecutorProcessSpec.getNumSlots());
-        assertEquals(
-                workerResourceSpec.getExtendedResources(),
-                taskExecutorProcessSpec.getExtendedResources());
+        assertThat(taskExecutorProcessSpec.getCpuCores())
+                .isEqualTo(workerResourceSpec.getCpuCores());
+        assertThat(taskExecutorProcessSpec.getTaskHeapSize())
+                .isEqualTo(workerResourceSpec.getTaskHeapSize());
+        assertThat(taskExecutorProcessSpec.getTaskOffHeapSize())
+                .isEqualTo(workerResourceSpec.getTaskOffHeapSize());
+        assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+                .isEqualTo(workerResourceSpec.getNetworkMemSize());
+        assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+                .isEqualTo(workerResourceSpec.getManagedMemSize());
+        assertThat(taskExecutorProcessSpec.getNumSlots())
+                .isEqualTo(workerResourceSpec.getNumSlots());
+        assertThat(taskExecutorProcessSpec.getExtendedResources())
+                .isEqualTo(workerResourceSpec.getExtendedResources());
     }
 
     @Test
-    public void testConfigCpuCores() {
+    void testConfigCpuCores() {
         final double cpuCores = 1.0;
 
         Configuration conf = new Configuration();
@@ -193,31 +181,30 @@ public class TaskExecutorProcessUtilsTest
         validateInAllConfigurations(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getCpuCores(),
-                                is(new CPUResource(cpuCores))));
+                        assertThat(taskExecutorProcessSpec.getCpuCores())
+                                .isEqualTo(new CPUResource(cpuCores)));
     }
 
     @Test
-    public void testConfigNoCpuCores() {
+    void testConfigNoCpuCores() {
         Configuration conf = new Configuration();
         conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
         validateInAllConfigurations(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getCpuCores(), is(new 
CPUResource(3.0))));
+                        assertThat(taskExecutorProcessSpec.getCpuCores())
+                                .isEqualTo(new CPUResource(3.0)));
     }
 
     @Test
-    public void testConfigNegativeCpuCores() {
+    void testConfigNegativeCpuCores() {
         Configuration conf = new Configuration();
         conf.setDouble(TaskManagerOptions.CPU_CORES, -0.1f);
         validateFailInAllConfigurations(conf);
     }
 
     @Test
-    public void testConfigFrameworkHeapMemory() {
+    void testConfigFrameworkHeapMemory() {
         final MemorySize frameworkHeapSize = MemorySize.parse("100m");
 
         Configuration conf = new Configuration();
@@ -226,13 +213,12 @@ public class TaskExecutorProcessUtilsTest
         validateInAllConfigurations(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getFrameworkHeapSize(),
-                                is(frameworkHeapSize)));
+                        
assertThat(taskExecutorProcessSpec.getFrameworkHeapSize())
+                                .isEqualTo(frameworkHeapSize));
     }
 
     @Test
-    public void testConfigFrameworkOffHeapMemory() {
+    void testConfigFrameworkOffHeapMemory() {
         final MemorySize frameworkOffHeapSize = MemorySize.parse("10m");
 
         Configuration conf = new Configuration();
@@ -241,13 +227,12 @@ public class TaskExecutorProcessUtilsTest
         validateInAllConfigurations(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                
taskExecutorProcessSpec.getFrameworkOffHeapMemorySize(),
-                                is(frameworkOffHeapSize)));
+                        
assertThat(taskExecutorProcessSpec.getFrameworkOffHeapMemorySize())
+                                .isEqualTo(frameworkOffHeapSize));
     }
 
     @Test
-    public void testConfigTaskHeapMemory() {
+    void testConfigTaskHeapMemory() {
         final MemorySize taskHeapSize = MemorySize.parse("50m");
 
         Configuration conf = new Configuration();
@@ -258,11 +243,12 @@ public class TaskExecutorProcessUtilsTest
         validateInConfigurationsWithoutExplicitTaskHeapMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(taskExecutorProcessSpec.getTaskHeapSize(), 
is(taskHeapSize)));
+                        assertThat(taskExecutorProcessSpec.getTaskHeapSize())
+                                .isEqualTo(taskHeapSize));
     }
 
     @Test
-    public void testConfigTaskOffheapMemory() {
+    void testConfigTaskOffheapMemory() {
         final MemorySize taskOffHeapSize = MemorySize.parse("50m");
 
         Configuration conf = new Configuration();
@@ -271,12 +257,12 @@ public class TaskExecutorProcessUtilsTest
         validateInAllConfigurations(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getTaskOffHeapSize(), 
is(taskOffHeapSize)));
+                        
assertThat(taskExecutorProcessSpec.getTaskOffHeapSize())
+                                .isEqualTo(taskOffHeapSize));
     }
 
     @Test
-    public void testConfigNetworkMemoryRange() {
+    void testConfigNetworkMemoryRange() {
         final MemorySize networkMin = MemorySize.parse("200m");
         final MemorySize networkMax = MemorySize.parse("500m");
 
@@ -287,18 +273,15 @@ public class TaskExecutorProcessUtilsTest
         validateInAllConfigurations(
                 conf,
                 taskExecutorProcessSpec -> {
-                    assertThat(
-                            
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
-                            greaterThanOrEqualTo(networkMin.getBytes()));
-                    assertThat(
-                            
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
-                            lessThanOrEqualTo(networkMax.getBytes()));
+                    
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+                            .isGreaterThanOrEqualTo(networkMin.getBytes());
+                    
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+                            .isLessThanOrEqualTo(networkMax.getBytes());
                 });
     }
 
     @Test
-    public void
-            
testConsistencyCheckOfDerivedNetworkMemoryWithinMinMaxRangeNotMatchingFractionPasses()
 {
+    void 
testConsistencyCheckOfDerivedNetworkMemoryWithinMinMaxRangeNotMatchingFractionPasses()
 {
         final Configuration configuration =
                 setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(400);
         // set fraction to be extremely low to not match the derived network 
memory
@@ -307,27 +290,30 @@ public class TaskExecutorProcessUtilsTest
         TaskExecutorProcessUtils.processSpecFromConfig(configuration);
     }
 
-    @Test(expected = IllegalConfigurationException.class)
+    @Test
     public void testConsistencyCheckOfDerivedNetworkMemoryLessThanMinFails() {
         final Configuration configuration =
                 setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(500);
         configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, 
MemorySize.parse("900m"));
         configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
MemorySize.parse("1000m"));
+
         // internal validation should fail
-        TaskExecutorProcessUtils.processSpecFromConfig(configuration);
+        assertThatExceptionOfType(IllegalConfigurationException.class)
+                .isThrownBy(() -> 
TaskExecutorProcessUtils.processSpecFromConfig(configuration));
     }
 
-    @Test(expected = IllegalConfigurationException.class)
+    @Test
     public void 
testConsistencyCheckOfDerivedNetworkMemoryGreaterThanMaxFails() {
         final Configuration configuration =
                 setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(500);
         configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, 
MemorySize.parse("100m"));
         configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
MemorySize.parse("150m"));
         // internal validation should fail
-        TaskExecutorProcessUtils.processSpecFromConfig(configuration);
+        assertThatExceptionOfType(IllegalConfigurationException.class)
+                .isThrownBy(() -> 
TaskExecutorProcessUtils.processSpecFromConfig(configuration));
     }
 
-    @Test(expected = IllegalConfigurationException.class)
+    @Test
     public void 
testConsistencyCheckOfDerivedNetworkMemoryDoesNotMatchLegacyConfigFails() {
         final int numberOfNetworkBuffers = 10;
         final int pageSizeMb = 16;
@@ -340,7 +326,8 @@ public class TaskExecutorProcessUtilsTest
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
numberOfNetworkBuffers);
         // internal validation should fail
-        TaskExecutorProcessUtils.processSpecFromConfig(configuration);
+        assertThatExceptionOfType(IllegalConfigurationException.class)
+                .isThrownBy(() -> 
TaskExecutorProcessUtils.processSpecFromConfig(configuration));
     }
 
     private static Configuration 
setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(
@@ -376,15 +363,14 @@ public class TaskExecutorProcessUtilsTest
 
         final TaskExecutorProcessSpec adjusteedTaskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(conf);
-        assertThat(
-                
adjusteedTaskExecutorProcessSpec.getNetworkMemSize().getMebiBytes(),
-                is(networkMemorySizeToDeriveMb));
+        
assertThat(adjusteedTaskExecutorProcessSpec.getNetworkMemSize().getMebiBytes())
+                .isEqualTo(networkMemorySizeToDeriveMb);
 
         return conf;
     }
 
     @Test
-    public void testConfigNetworkMemoryRangeFailure() {
+    void testConfigNetworkMemoryRangeFailure() {
         final MemorySize networkMin = MemorySize.parse("200m");
         final MemorySize networkMax = MemorySize.parse("50m");
 
@@ -396,7 +382,7 @@ public class TaskExecutorProcessUtilsTest
     }
 
     @Test
-    public void testConfigNetworkMemoryFraction() {
+    void testConfigNetworkMemoryFraction() {
         final MemorySize networkMin = MemorySize.ZERO;
         final MemorySize networkMax = MemorySize.parse("1t");
         final float fraction = 0.2f;
@@ -413,16 +399,15 @@ public class TaskExecutorProcessUtilsTest
         validateInConfigWithExplicitTaskHeapAndManagedMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getNetworkMemSize(),
-                                is(
+                        assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+                                .isEqualTo(
                                         taskExecutorProcessSpec
                                                 .getTotalFlinkMemorySize()
-                                                .multiply(fraction))));
+                                                .multiply(fraction)));
     }
 
     @Test
-    public void testConfigNetworkMemoryFractionFailure() {
+    void testConfigNetworkMemoryFractionFailure() {
         Configuration conf = new Configuration();
         conf.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, -0.1f);
         validateFailInAllConfigurations(conf);
@@ -432,7 +417,7 @@ public class TaskExecutorProcessUtilsTest
     }
 
     @Test
-    public void testConfigNetworkMemoryLegacyRangeFraction() {
+    void testConfigNetworkMemoryLegacyRangeFraction() {
         final MemorySize networkMin = MemorySize.parse("200m");
         final MemorySize networkMax = MemorySize.parse("500m");
 
@@ -455,12 +440,10 @@ public class TaskExecutorProcessUtilsTest
         validateInAllConfigurations(
                 conf,
                 taskExecutorProcessSpec -> {
-                    assertThat(
-                            
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
-                            greaterThanOrEqualTo(networkMin.getBytes()));
-                    assertThat(
-                            
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
-                            lessThanOrEqualTo(networkMax.getBytes()));
+                    
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+                            .isGreaterThanOrEqualTo(networkMin.getBytes());
+                    
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+                            .isLessThanOrEqualTo(networkMax.getBytes());
                 });
 
         conf.setString(legacyOptionMin, "0m");
@@ -470,16 +453,15 @@ public class TaskExecutorProcessUtilsTest
         validateInConfigWithExplicitTaskHeapAndManagedMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getNetworkMemSize(),
-                                is(
+                        assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+                                .isEqualTo(
                                         taskExecutorProcessSpec
                                                 .getTotalFlinkMemorySize()
-                                                .multiply(fraction))));
+                                                .multiply(fraction)));
     }
 
     @Test
-    public void testConfigNetworkMemoryLegacyNumOfBuffers() {
+    void testConfigNetworkMemoryLegacyNumOfBuffers() {
         final MemorySize pageSize = MemorySize.parse("32k");
         final int numOfBuffers = 1024;
         final MemorySize networkSize = pageSize.multiply(numOfBuffers);
@@ -499,15 +481,17 @@ public class TaskExecutorProcessUtilsTest
         validateInConfigWithExplicitTaskHeapAndManagedMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        
assertThat(taskExecutorProcessSpec.getNetworkMemSize(), is(networkSize)));
+                        assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+                                .isEqualTo(networkSize));
         validateInConfigurationsWithoutExplicitTaskHeapMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        
assertThat(taskExecutorProcessSpec.getNetworkMemSize(), is(networkSize)));
+                        assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+                                .isEqualTo(networkSize));
     }
 
     @Test
-    public void testConfigManagedMemorySize() {
+    void testConfigManagedMemorySize() {
         final MemorySize managedMemSize = MemorySize.parse("100m");
 
         Configuration conf = new Configuration();
@@ -518,13 +502,12 @@ public class TaskExecutorProcessUtilsTest
         validateInConfigurationsWithoutExplicitManagedMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getManagedMemorySize(),
-                                is(managedMemSize)));
+                        
assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+                                .isEqualTo(managedMemSize));
     }
 
     @Test
-    public void testConfigManagedMemoryLegacySize() {
+    void testConfigManagedMemoryLegacySize() {
         final MemorySize managedMemSize = MemorySize.parse("100m");
 
         @SuppressWarnings("deprecation")
@@ -538,13 +521,12 @@ public class TaskExecutorProcessUtilsTest
         validateInConfigurationsWithoutExplicitManagedMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getManagedMemorySize(),
-                                is(managedMemSize)));
+                        
assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+                                .isEqualTo(managedMemSize));
     }
 
     @Test
-    public void testConfigManagedMemoryFraction() {
+    void testConfigManagedMemoryFraction() {
         final float fraction = 0.5f;
 
         Configuration conf = new Configuration();
@@ -555,16 +537,15 @@ public class TaskExecutorProcessUtilsTest
         validateInConfigurationsWithoutExplicitManagedMem(
                 conf,
                 taskExecutorProcessSpec ->
-                        assertThat(
-                                taskExecutorProcessSpec.getManagedMemorySize(),
-                                is(
+                        
assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+                                .isEqualTo(
                                         taskExecutorProcessSpec
                                                 .getTotalFlinkMemorySize()
-                                                .multiply(fraction))));
+                                                .multiply(fraction)));
     }
 
     @Test
-    public void testConfigManagedMemoryFractionFailure() {
+    void testConfigManagedMemoryFractionFailure() {
         final Configuration conf = new Configuration();
         conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, -0.1f);
         validateFailInConfigurationsWithoutExplicitManagedMem(conf);
@@ -574,7 +555,7 @@ public class TaskExecutorProcessUtilsTest
     }
 
     @Test
-    public void testFlinkInternalMemorySizeAddUpFailure() {
+    void testFlinkInternalMemorySizeAddUpFailure() {
         final MemorySize totalFlinkMemory = MemorySize.parse("499m");
         final MemorySize frameworkHeap = MemorySize.parse("100m");
         final MemorySize taskHeap = MemorySize.parse("100m");
@@ -595,7 +576,7 @@ public class TaskExecutorProcessUtilsTest
     }
 
     @Test
-    public void testFlinkInternalMemoryFractionAddUpFailure() {
+    void testFlinkInternalMemoryFractionAddUpFailure() {
         final float networkFraction = 0.6f;
         final float managedFraction = 0.6f;
 
@@ -610,7 +591,7 @@ public class TaskExecutorProcessUtilsTest
     }
 
     @Test
-    public void testConfigTotalProcessMemoryLegacySize() {
+    void testConfigTotalProcessMemoryLegacySize() {
         final MemorySize totalProcessMemorySize = MemorySize.parse("2g");
 
         @SuppressWarnings("deprecation")
@@ -621,25 +602,23 @@ public class TaskExecutorProcessUtilsTest
 
         TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(conf);
-        assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize(), 
is(totalProcessMemorySize));
+        assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize())
+                .isEqualTo(totalProcessMemorySize);
     }
 
     @Test
     public void testExceptionShouldContainRequiredConfigOptions() {
-        try {
-            TaskExecutorProcessUtils.processSpecFromConfig(new 
Configuration());
-        } catch (final IllegalConfigurationException e) {
-            assertThat(e.getMessage(), 
containsString(TaskManagerOptions.TASK_HEAP_MEMORY.key()));
-            assertThat(
-                    e.getMessage(), 
containsString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key()));
-            assertThat(e.getMessage(), 
containsString(TaskManagerOptions.TOTAL_FLINK_MEMORY.key()));
-            assertThat(
-                    e.getMessage(), 
containsString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key()));
-        }
+        assertThatThrownBy(
+                        () -> 
TaskExecutorProcessUtils.processSpecFromConfig(new Configuration()))
+                .isInstanceOf(IllegalConfigurationException.class)
+                
.hasMessageContaining(TaskManagerOptions.TASK_HEAP_MEMORY.key())
+                
.hasMessageContaining(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())
+                
.hasMessageContaining(TaskManagerOptions.TOTAL_FLINK_MEMORY.key())
+                
.hasMessageContaining(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key());
     }
 
     @Test
-    public void testConfigNumSlots() {
+    void testConfigNumSlots() {
         final int numSlots = 5;
 
         Configuration conf = new Configuration();
@@ -647,13 +626,12 @@ public class TaskExecutorProcessUtilsTest
 
         validateInAllConfigurations(
                 conf,
-                taskExecutorProcessSpec -> {
-                    assertThat(taskExecutorProcessSpec.getNumSlots(), 
is(numSlots));
-                });
+                taskExecutorProcessSpec ->
+                        
assertThat(taskExecutorProcessSpec.getNumSlots()).isEqualTo(numSlots));
     }
 
     @Test
-    public void testProcessSpecFromConfigWithExternalResource() {
+    void testProcessSpecFromConfigWithExternalResource() {
         final Configuration config = new Configuration();
         config.setString(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
EXTERNAL_RESOURCE_NAME_1);
@@ -663,14 +641,14 @@ public class TaskExecutorProcessUtilsTest
         config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(4096));
         final TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(config);
-        assertThat(taskExecutorProcessSpec.getExtendedResources().size(), 
is(1));
+        assertThat(taskExecutorProcessSpec.getExtendedResources()).hasSize(1);
         assertThat(
-                taskExecutorProcessSpec
-                        .getExtendedResources()
-                        .get(EXTERNAL_RESOURCE_NAME_1)
-                        .getValue()
-                        .longValue(),
-                is(1L));
+                        taskExecutorProcessSpec
+                                .getExtendedResources()
+                                .get(EXTERNAL_RESOURCE_NAME_1)
+                                .getValue()
+                                .longValue())
+                .isOne();
     }
 
     @Override
@@ -720,8 +698,8 @@ public class TaskExecutorProcessUtilsTest
         config.addAll(customConfig);
         TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(config);
-        assertThat(taskExecutorProcessSpec.getTaskHeapSize(), 
is(TASK_HEAP_SIZE));
-        assertThat(taskExecutorProcessSpec.getManagedMemorySize(), 
is(MANAGED_MEM_SIZE));
+        
assertThat(taskExecutorProcessSpec.getTaskHeapSize()).isEqualTo(TASK_HEAP_SIZE);
+        
assertThat(taskExecutorProcessSpec.getManagedMemorySize()).isEqualTo(MANAGED_MEM_SIZE);
         validateFunc.accept(taskExecutorProcessSpec);
     }
 
@@ -741,7 +719,8 @@ public class TaskExecutorProcessUtilsTest
         config.addAll(customConfig);
         TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(config);
-        assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize(), 
is(TOTAL_FLINK_MEM_SIZE));
+        assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize())
+                .isEqualTo(TOTAL_FLINK_MEM_SIZE);
         validateFunc.accept(taskExecutorProcessSpec);
     }
 
@@ -760,8 +739,9 @@ public class TaskExecutorProcessUtilsTest
         config.addAll(customConfig);
         TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(config);
-        assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize(), 
is(TOTAL_FLINK_MEM_SIZE));
-        assertThat(taskExecutorProcessSpec.getTaskHeapSize(), 
is(TASK_HEAP_SIZE));
+        assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize())
+                .isEqualTo(TOTAL_FLINK_MEM_SIZE);
+        
assertThat(taskExecutorProcessSpec.getTaskHeapSize()).isEqualTo(TASK_HEAP_SIZE);
         validateFunc.accept(taskExecutorProcessSpec);
     }
 
@@ -781,8 +761,9 @@ public class TaskExecutorProcessUtilsTest
         config.addAll(customConfig);
         TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(config);
-        assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize(), 
is(TOTAL_FLINK_MEM_SIZE));
-        assertThat(taskExecutorProcessSpec.getManagedMemorySize(), 
is(MANAGED_MEM_SIZE));
+        assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize())
+                .isEqualTo(TOTAL_FLINK_MEM_SIZE);
+        
assertThat(taskExecutorProcessSpec.getManagedMemorySize()).isEqualTo(MANAGED_MEM_SIZE);
         validateFunc.accept(taskExecutorProcessSpec);
     }
 
@@ -802,7 +783,8 @@ public class TaskExecutorProcessUtilsTest
         config.addAll(customConfig);
         TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromConfig(config);
-        assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize(), 
is(TOTAL_PROCESS_MEM_SIZE));
+        assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize())
+                .isEqualTo(TOTAL_PROCESS_MEM_SIZE);
         validateFunc.accept(taskExecutorProcessSpec);
     }
 
@@ -815,12 +797,9 @@ public class TaskExecutorProcessUtilsTest
 
     @Override
     protected void validateFail(final Configuration config) {
-        try {
-            TaskExecutorProcessUtils.processSpecFromConfig(config);
-            fail("Configuration did not fail as expected.");
-        } catch (IllegalConfigurationException e) {
-            // expected
-        }
+        assertThatExceptionOfType(IllegalConfigurationException.class)
+                .as("Configuration did not fail as expected.")
+                .isThrownBy(() -> 
TaskExecutorProcessUtils.processSpecFromConfig(config));
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
index e448ff0b567..c79c47c1238 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
@@ -20,51 +20,49 @@ package org.apache.flink.runtime.clusterframework.types;
 
 import org.apache.flink.configuration.MemorySize;
 
-import org.hamcrest.Matchers;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ResourceBudgetManager}. */
 public class ResourceBudgetManagerTest {
 
     @Test
-    public void testReserve() {
+    void testReserve() {
         ResourceBudgetManager budgetManager =
                 new ResourceBudgetManager(createResourceProfile(1.0, 100));
 
-        assertThat(budgetManager.reserve(createResourceProfile(0.7, 70)), 
Matchers.is(true));
-        assertThat(budgetManager.getAvailableBudget(), 
is(createResourceProfile(0.3, 30)));
+        assertThat(budgetManager.reserve(createResourceProfile(0.7, 
70))).isEqualTo(true);
+        
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(0.3,
 30));
     }
 
     @Test
-    public void testReserveFail() {
+    void testReserveFail() {
         ResourceBudgetManager budgetManager =
                 new ResourceBudgetManager(createResourceProfile(1.0, 100));
 
-        assertThat(budgetManager.reserve(createResourceProfile(1.2, 120)), 
Matchers.is(false));
-        assertThat(budgetManager.getAvailableBudget(), 
is(createResourceProfile(1.0, 100)));
+        assertThat(budgetManager.reserve(createResourceProfile(1.2, 
120))).isEqualTo(false);
+        
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(1.0,
 100));
     }
 
     @Test
-    public void testRelease() {
+    void testRelease() {
         ResourceBudgetManager budgetManager =
                 new ResourceBudgetManager(createResourceProfile(1.0, 100));
 
-        assertThat(budgetManager.reserve(createResourceProfile(0.7, 70)), 
Matchers.is(true));
-        assertThat(budgetManager.release(createResourceProfile(0.5, 50)), 
Matchers.is(true));
-        assertThat(budgetManager.getAvailableBudget(), 
is(createResourceProfile(0.8, 80)));
+        assertThat(budgetManager.reserve(createResourceProfile(0.7, 
70))).isEqualTo(true);
+        assertThat(budgetManager.release(createResourceProfile(0.5, 
50))).isEqualTo(true);
+        
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(0.8,
 80));
     }
 
     @Test
-    public void testReleaseFail() {
+    void testReleaseFail() {
         ResourceBudgetManager budgetManager =
                 new ResourceBudgetManager(createResourceProfile(1.0, 100));
 
-        assertThat(budgetManager.reserve(createResourceProfile(0.7, 70)), 
Matchers.is(true));
-        assertThat(budgetManager.release(createResourceProfile(0.8, 80)), 
Matchers.is(false));
-        assertThat(budgetManager.getAvailableBudget(), 
is(createResourceProfile(0.3, 30)));
+        assertThat(budgetManager.reserve(createResourceProfile(0.7, 
70))).isEqualTo(true);
+        assertThat(budgetManager.release(createResourceProfile(0.8, 
80))).isEqualTo(false);
+        
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(0.3,
 30));
     }
 
     private static ResourceProfile createResourceProfile(double cpus, int 
memory) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index e760f2e2008..ca2a10d97e4 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -23,35 +23,25 @@ import org.apache.flink.api.common.resources.CPUResource;
 import org.apache.flink.api.common.resources.ExternalResource;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.Matcher;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_CPU_CORE_NUMBER_TO_LOG;
 import static 
org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_MEMORY_SIZE_TO_LOG;
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 /** Tests for the {@link ResourceProfile}. */
-public class ResourceProfileTest extends TestLogger {
+class ResourceProfileTest {
     private static final MemorySize TOO_LARGE_MEMORY =
             MAX_MEMORY_SIZE_TO_LOG.add(MemorySize.ofMebiBytes(10));
     private static final String EXTERNAL_RESOURCE_NAME = "gpu";
 
     @Test
-    public void testAllFieldsNoLessThanProfile() {
+    void testAllFieldsNoLessThanProfile() {
         final ResourceProfile rp1 =
                 ResourceProfile.newBuilder()
                         .setCpuCores(1.0)
@@ -81,19 +71,19 @@ public class ResourceProfileTest extends TestLogger {
                         .setManagedMemoryMB(200)
                         .build();
 
-        assertFalse(rp1.allFieldsNoLessThan(rp2));
-        assertTrue(rp2.allFieldsNoLessThan(rp1));
+        assertThat(rp1.allFieldsNoLessThan(rp2)).isFalse();
+        assertThat(rp2.allFieldsNoLessThan(rp1)).isTrue();
 
-        assertFalse(rp1.allFieldsNoLessThan(rp3));
-        assertTrue(rp3.allFieldsNoLessThan(rp1));
+        assertThat(rp1.allFieldsNoLessThan(rp3)).isFalse();
+        assertThat(rp3.allFieldsNoLessThan(rp1)).isTrue();
 
-        assertFalse(rp2.allFieldsNoLessThan(rp3));
-        assertFalse(rp3.allFieldsNoLessThan(rp2));
+        assertThat(rp2.allFieldsNoLessThan(rp3)).isFalse();
+        assertThat(rp3.allFieldsNoLessThan(rp2)).isFalse();
 
-        assertTrue(rp4.allFieldsNoLessThan(rp1));
-        assertTrue(rp4.allFieldsNoLessThan(rp2));
-        assertTrue(rp4.allFieldsNoLessThan(rp3));
-        assertTrue(rp4.allFieldsNoLessThan(rp4));
+        assertThat(rp4.allFieldsNoLessThan(rp1)).isTrue();
+        assertThat(rp4.allFieldsNoLessThan(rp2)).isTrue();
+        assertThat(rp4.allFieldsNoLessThan(rp3)).isTrue();
+        assertThat(rp4.allFieldsNoLessThan(rp4)).isTrue();
 
         final ResourceProfile rp5 =
                 ResourceProfile.newBuilder()
@@ -103,7 +93,7 @@ public class ResourceProfileTest extends TestLogger {
                         .setManagedMemoryMB(100)
                         .setNetworkMemoryMB(100)
                         .build();
-        assertFalse(rp4.allFieldsNoLessThan(rp5));
+        assertThat(rp4.allFieldsNoLessThan(rp5)).isFalse();
 
         ResourceSpec rs1 =
                 ResourceSpec.newBuilder(1.0, 100)
@@ -114,22 +104,24 @@ public class ResourceProfileTest extends TestLogger {
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.1))
                         .build();
 
-        
assertFalse(rp1.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)));
-        assertTrue(
-                ResourceProfile.fromResourceSpec(rs1)
-                        
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs2)));
-        assertFalse(
-                ResourceProfile.fromResourceSpec(rs2)
-                        
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)));
+        
assertThat(rp1.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1))).isFalse();
+        assertThat(
+                        ResourceProfile.fromResourceSpec(rs1)
+                                
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs2)))
+                .isTrue();
+        assertThat(
+                        ResourceProfile.fromResourceSpec(rs2)
+                                
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)))
+                .isFalse();
     }
 
     @Test
-    public void testUnknownNoLessThanUnknown() {
-        
assertTrue(ResourceProfile.UNKNOWN.allFieldsNoLessThan(ResourceProfile.UNKNOWN));
+    void testUnknownNoLessThanUnknown() {
+        
assertThat(ResourceProfile.UNKNOWN.allFieldsNoLessThan(ResourceProfile.UNKNOWN)).isTrue();
     }
 
     @Test
-    public void testMatchRequirement() {
+    void testMatchRequirement() {
         final ResourceProfile resource1 =
                 ResourceProfile.newBuilder()
                         .setCpuCores(1.0)
@@ -162,20 +154,21 @@ public class ResourceProfileTest extends TestLogger {
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
                         .build();
 
-        assertTrue(resource1.isMatching(requirement1));
-        assertTrue(resource1.isMatching(requirement2));
-        assertFalse(resource1.isMatching(requirement3));
+        assertThat(resource1.isMatching(requirement1)).isTrue();
+        assertThat(resource1.isMatching(requirement2)).isTrue();
+        assertThat(resource1.isMatching(requirement3)).isFalse();
 
-        assertTrue(resource2.isMatching(requirement1));
-        assertFalse(resource2.isMatching(requirement2));
-        assertTrue(resource2.isMatching(requirement3));
+        assertThat(resource2.isMatching(requirement1)).isTrue();
+        assertThat(resource2.isMatching(requirement2)).isFalse();
+        assertThat(resource2.isMatching(requirement3)).isTrue();
     }
 
     @Test
-    public void testEquals() {
+    void testEquals() {
         ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).build();
         ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();
-        assertEquals(ResourceProfile.fromResourceSpec(rs1), 
ResourceProfile.fromResourceSpec(rs2));
+        assertThat(ResourceProfile.fromResourceSpec(rs2))
+                .isEqualTo(ResourceProfile.fromResourceSpec(rs1));
 
         ResourceSpec rs3 =
                 ResourceSpec.newBuilder(1.0, 100)
@@ -185,17 +178,16 @@ public class ResourceProfileTest extends TestLogger {
                 ResourceSpec.newBuilder(1.0, 100)
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.1))
                         .build();
-        assertNotEquals(
-                ResourceProfile.fromResourceSpec(rs3), 
ResourceProfile.fromResourceSpec(rs4));
+        assertThat(ResourceProfile.fromResourceSpec(rs4))
+                .isNotEqualTo(ResourceProfile.fromResourceSpec(rs3));
 
         ResourceSpec rs5 =
                 ResourceSpec.newBuilder(1.0, 100)
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 2.2))
                         .build();
         MemorySize networkMemory = MemorySize.ofMebiBytes(100);
-        assertEquals(
-                ResourceProfile.fromResourceSpec(rs3, networkMemory),
-                ResourceProfile.fromResourceSpec(rs5, networkMemory));
+        assertThat(ResourceProfile.fromResourceSpec(rs5, networkMemory))
+                .isEqualTo(ResourceProfile.fromResourceSpec(rs3, 
networkMemory));
 
         final ResourceProfile rp1 =
                 ResourceProfile.newBuilder()
@@ -262,33 +254,32 @@ public class ResourceProfileTest extends TestLogger {
                         .setNetworkMemoryMB(100)
                         .build();
 
-        assertNotEquals(rp1, rp2);
-        assertNotEquals(rp1, rp3);
-        assertNotEquals(rp1, rp4);
-        assertNotEquals(rp1, rp5);
-        assertNotEquals(rp1, rp6);
-        assertNotEquals(rp1, rp7);
-        assertEquals(rp1, rp8);
+        assertThat(rp2).isNotEqualTo(rp1);
+        assertThat(rp3).isNotEqualTo(rp1);
+        assertThat(rp4).isNotEqualTo(rp1);
+        assertThat(rp5).isNotEqualTo(rp1);
+        assertThat(rp6).isNotEqualTo(rp1);
+        assertThat(rp7).isNotEqualTo(rp1);
+        assertThat(rp8).isEqualTo(rp1);
     }
 
     @Test
-    public void testGet() {
+    void testGet() {
         ResourceSpec rs =
                 ResourceSpec.newBuilder(1.0, 100)
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.6))
                         .build();
         ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, 
MemorySize.ofMebiBytes(50));
 
-        assertEquals(new CPUResource(1.0), rp.getCpuCores());
-        assertEquals(150, rp.getTotalMemory().getMebiBytes());
-        assertEquals(100, rp.getOperatorsMemory().getMebiBytes());
-        assertEquals(
-                new ExternalResource(EXTERNAL_RESOURCE_NAME, 1.6),
-                rp.getExtendedResources().get(EXTERNAL_RESOURCE_NAME));
+        assertThat(rp.getCpuCores()).isEqualTo(new CPUResource(1.0));
+        assertThat(rp.getTotalMemory().getMebiBytes()).isEqualTo(150);
+        assertThat(rp.getOperatorsMemory().getMebiBytes()).isEqualTo(100);
+        assertThat(rp.getExtendedResources().get(EXTERNAL_RESOURCE_NAME))
+                .isEqualTo(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1.6));
     }
 
     @Test
-    public void testMerge() {
+    void testMerge() {
         final ResourceProfile rp1 =
                 ResourceProfile.newBuilder()
                         .setCpuCores(1.0)
@@ -334,22 +325,22 @@ public class ResourceProfileTest extends TestLogger {
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 4.0))
                         .build();
 
-        assertEquals(rp1MergeRp1, rp1.merge(rp1));
-        assertEquals(rp1MergeRp2, rp1.merge(rp2));
-        assertEquals(rp1MergeRp2, rp2.merge(rp1));
-        assertEquals(rp2MergeRp2, rp2.merge(rp2));
+        assertThat(rp1.merge(rp1)).isEqualTo(rp1MergeRp1);
+        assertThat(rp1.merge(rp2)).isEqualTo(rp1MergeRp2);
+        assertThat(rp2.merge(rp1)).isEqualTo(rp1MergeRp2);
+        assertThat(rp2.merge(rp2)).isEqualTo(rp2MergeRp2);
 
-        assertEquals(ResourceProfile.UNKNOWN, 
rp1.merge(ResourceProfile.UNKNOWN));
-        assertEquals(ResourceProfile.UNKNOWN, 
ResourceProfile.UNKNOWN.merge(rp1));
-        assertEquals(
-                ResourceProfile.UNKNOWN, 
ResourceProfile.UNKNOWN.merge(ResourceProfile.UNKNOWN));
-        assertEquals(ResourceProfile.ANY, rp1.merge(ResourceProfile.ANY));
-        assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.merge(rp1));
-        assertEquals(ResourceProfile.ANY, 
ResourceProfile.ANY.merge(ResourceProfile.ANY));
+        
assertThat(rp1.merge(ResourceProfile.UNKNOWN)).isEqualTo(ResourceProfile.UNKNOWN);
+        
assertThat(ResourceProfile.UNKNOWN.merge(rp1)).isEqualTo(ResourceProfile.UNKNOWN);
+        assertThat(ResourceProfile.UNKNOWN.merge(ResourceProfile.UNKNOWN))
+                .isEqualTo(ResourceProfile.UNKNOWN);
+        
assertThat(rp1.merge(ResourceProfile.ANY)).isEqualTo(ResourceProfile.ANY);
+        
assertThat(ResourceProfile.ANY.merge(rp1)).isEqualTo(ResourceProfile.ANY);
+        
assertThat(ResourceProfile.ANY.merge(ResourceProfile.ANY)).isEqualTo(ResourceProfile.ANY);
     }
 
     @Test
-    public void testMergeWithOverflow() {
+    void testMergeWithOverflow() {
         final CPUResource largeDouble = new CPUResource(Double.MAX_VALUE - 
1.0);
         final MemorySize largeMemory = 
MemorySize.MAX_VALUE.subtract(MemorySize.parse("100m"));
 
@@ -386,11 +377,11 @@ public class ResourceProfileTest extends TestLogger {
         } catch (ArithmeticException e) {
             exceptions.add(e);
         }
-        assertEquals(3, exceptions.size());
+        assertThat(exceptions).hasSize(3);
     }
 
     @Test
-    public void testSubtract() {
+    void testSubtract() {
         final ResourceProfile rp1 =
                 ResourceProfile.newBuilder()
                         .setCpuCores(1.0)
@@ -416,28 +407,26 @@ public class ResourceProfileTest extends TestLogger {
                         .setNetworkMemoryMB(300)
                         .build();
 
-        assertEquals(rp1, rp3.subtract(rp2));
-        assertEquals(rp1, rp2.subtract(rp1));
+        assertThat(rp3.subtract(rp2)).isEqualTo(rp1);
+        assertThat(rp2.subtract(rp1)).isEqualTo(rp1);
 
-        try {
-            rp1.subtract(rp2);
-            fail("The subtract should failed due to trying to subtract a 
larger resource");
-        } catch (IllegalArgumentException ex) {
-            // Ignore ex.
-        }
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .as("The subtract should failed due to trying to subtract a 
larger resource")
+                .isThrownBy(() -> rp1.subtract(rp2));
 
-        assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.subtract(rp3));
-        assertEquals(ResourceProfile.ANY, 
ResourceProfile.ANY.subtract(ResourceProfile.ANY));
-        assertEquals(ResourceProfile.ANY, rp3.subtract(ResourceProfile.ANY));
+        
assertThat(ResourceProfile.ANY.subtract(rp3)).isEqualTo(ResourceProfile.ANY);
+        assertThat(ResourceProfile.ANY.subtract(ResourceProfile.ANY))
+                .isEqualTo(ResourceProfile.ANY);
+        
assertThat(rp3.subtract(ResourceProfile.ANY)).isEqualTo(ResourceProfile.ANY);
 
-        assertEquals(ResourceProfile.UNKNOWN, 
ResourceProfile.UNKNOWN.subtract(rp3));
-        assertEquals(ResourceProfile.UNKNOWN, 
rp3.subtract(ResourceProfile.UNKNOWN));
-        assertEquals(
-                ResourceProfile.UNKNOWN, 
ResourceProfile.UNKNOWN.subtract(ResourceProfile.UNKNOWN));
+        
assertThat(ResourceProfile.UNKNOWN.subtract(rp3)).isEqualTo(ResourceProfile.UNKNOWN);
+        
assertThat(rp3.subtract(ResourceProfile.UNKNOWN)).isEqualTo(ResourceProfile.UNKNOWN);
+        assertThat(ResourceProfile.UNKNOWN.subtract(ResourceProfile.UNKNOWN))
+                .isEqualTo(ResourceProfile.UNKNOWN);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testSubtractWithInfValues() {
+    @Test
+    void testSubtractWithInfValues() {
         // Does not equals to ANY since it has extended resources.
         final ResourceProfile rp1 =
                 ResourceProfile.newBuilder()
@@ -457,11 +446,12 @@ public class ResourceProfileTest extends TestLogger {
                         .setNetworkMemoryMB(200)
                         .build();
 
-        rp2.subtract(rp1);
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(() -> rp2.subtract(rp1));
     }
 
     @Test
-    public void testMultiply() {
+    void testMultiply() {
         final int by = 3;
         final ResourceProfile rp1 =
                 ResourceProfile.newBuilder()
@@ -478,11 +468,11 @@ public class ResourceProfileTest extends TestLogger {
             rp2 = rp2.merge(rp1);
         }
 
-        assertEquals(rp2, rp1.multiply(by));
+        assertThat(rp1.multiply(by)).isEqualTo(rp2);
     }
 
     @Test
-    public void testMultiplyZero() {
+    void testMultiplyZero() {
         final ResourceProfile rp1 =
                 ResourceProfile.newBuilder()
                         .setCpuCores(1.0)
@@ -493,10 +483,10 @@ public class ResourceProfileTest extends TestLogger {
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
                         .build();
 
-        assertEquals(ResourceProfile.ZERO, rp1.multiply(0));
+        assertThat(rp1.multiply(0)).isEqualTo(ResourceProfile.ZERO);
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void testMultiplyNegative() {
         final ResourceProfile rp =
                 ResourceProfile.newBuilder()
@@ -507,73 +497,66 @@ public class ResourceProfileTest extends TestLogger {
                         .setManagedMemoryMB(100)
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
                         .build();
-        rp.multiply(-2);
+
+        
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> 
rp.multiply(-2));
     }
 
     @Test
-    public void testFromSpecWithSerializationCopy() throws Exception {
+    void testFromSpecWithSerializationCopy() throws Exception {
         final ResourceSpec copiedSpec =
                 CommonTestUtils.createCopySerializable(ResourceSpec.UNKNOWN);
         final ResourceProfile profile = 
ResourceProfile.fromResourceSpec(copiedSpec);
 
-        assertEquals(ResourceProfile.fromResourceSpec(ResourceSpec.UNKNOWN), 
profile);
+        
assertThat(profile).isEqualTo(ResourceProfile.fromResourceSpec(ResourceSpec.UNKNOWN));
     }
 
     @Test
-    public void testSingletonPropertyOfUnknown() throws Exception {
+    void testSingletonPropertyOfUnknown() throws Exception {
         final ResourceProfile copiedProfile =
                 
CommonTestUtils.createCopySerializable(ResourceProfile.UNKNOWN);
 
-        assertSame(ResourceProfile.UNKNOWN, copiedProfile);
+        assertThat(copiedProfile).isSameAs(ResourceProfile.UNKNOWN);
     }
 
     @Test
-    public void testSingletonPropertyOfAny() throws Exception {
+    void testSingletonPropertyOfAny() throws Exception {
         final ResourceProfile copiedProfile =
                 CommonTestUtils.createCopySerializable(ResourceProfile.ANY);
 
-        assertSame(ResourceProfile.ANY, copiedProfile);
+        assertThat(copiedProfile).isSameAs(ResourceProfile.ANY);
     }
 
     @Test
-    public void doesNotIncludeCPUAndMemoryInToStringIfTheyAreTooLarge() {
+    void doesNotIncludeCPUAndMemoryInToStringIfTheyAreTooLarge() {
         double tooLargeCpuCount = MAX_CPU_CORE_NUMBER_TO_LOG.doubleValue() + 
1.0;
         ResourceProfile resourceProfile = 
createResourceProfile(tooLargeCpuCount, TOO_LARGE_MEMORY);
-        assertThat(
-                resourceProfile.toString(),
-                allOf(not(containsCPUCores()), not(containsTaskHeapMemory())));
+        assertThat(resourceProfile.toString())
+                .doesNotContain("cpuCores=")
+                .doesNotContain("taskHeapMemory=");
     }
 
     @Test
-    public void includesCPUAndMemoryInToStringIfTheyAreBelowThreshold() {
+    void includesCPUAndMemoryInToStringIfTheyAreBelowThreshold() {
         ResourceProfile resourceProfile = createResourceProfile(1.0, 
MemorySize.ofMebiBytes(4));
-        assertThat(resourceProfile.toString(), allOf(containsCPUCores(), 
containsTaskHeapMemory()));
+        
assertThat(resourceProfile.toString()).contains("cpuCores=").contains("taskHeapMemory=");
     }
 
     @Test
-    public void testZeroExtendedResourceFromConstructor() {
+    void testZeroExtendedResourceFromConstructor() {
         final ResourceProfile resourceProfile =
                 ResourceProfile.newBuilder()
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 0.0))
                         .build();
-        assertEquals(resourceProfile.getExtendedResources().size(), 0);
+        assertThat(resourceProfile.getExtendedResources()).isEmpty();
     }
 
     @Test
-    public void testZeroExtendedResourceFromSubtract() {
+    void testZeroExtendedResourceFromSubtract() {
         final ResourceProfile resourceProfile =
                 ResourceProfile.newBuilder()
                         .setExtendedResource(new 
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
                         .build();
-        
assertEquals(resourceProfile.subtract(resourceProfile).getExtendedResources().size(),
 0);
-    }
-
-    private Matcher<String> containsTaskHeapMemory() {
-        return containsString("taskHeapMemory=");
-    }
-
-    private Matcher<String> containsCPUCores() {
-        return containsString("cpuCores=");
+        
assertThat(resourceProfile.subtract(resourceProfile).getExtendedResources()).isEmpty();
     }
 
     private static ResourceProfile createResourceProfile(double cpu, 
MemorySize taskHeapMemory) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index 651275afb70..38b52c29daa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -24,11 +24,12 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtilsTestBase;
 import 
org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemoryUtils;
-import org.apache.flink.testutils.logging.TestLoggerResource;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 
-import org.hamcrest.MatcherAssert;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
 import java.util.HashMap;
@@ -37,23 +38,21 @@ import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.JM_LEGACY_HEAP_OPTIONS;
 import static 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.JM_PROCESS_MEMORY_OPTIONS;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.collection.IsArrayWithSize.arrayWithSize;
-import static org.hamcrest.collection.IsMapContaining.hasKey;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 /** Tests for {@link JobManagerProcessUtils}. */
-public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobManagerProcessSpec> {
+class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobManagerProcessSpec> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerProcessUtilsTest.class);
+
     private static final MemorySize JVM_HEAP_SIZE = MemorySize.parse("1152m");
     private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("1280m");
     private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1536m");
 
-    @Rule
-    public final TestLoggerResource testLoggerResource =
-            new TestLoggerResource(JobManagerFlinkMemoryUtils.class, 
Level.INFO);
+    @RegisterExtension
+    private final LoggerAuditingExtension testLoggerResource =
+            new LoggerAuditingExtension(JobManagerFlinkMemoryUtils.class, 
Level.INFO);
 
     public JobManagerProcessUtilsTest() {
         super(
@@ -63,7 +62,7 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
     }
 
     @Test
-    public void testGenerateDynamicConfigurations() {
+    void testGenerateDynamicConfigurations() {
         Configuration config = new Configuration();
         config.set(JobManagerOptions.JVM_HEAP_MEMORY, MemorySize.parse("1m"));
         config.set(JobManagerOptions.OFF_HEAP_MEMORY, MemorySize.parse("2m"));
@@ -78,21 +77,16 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
         Map<String, String> configs =
                 
parseAndAssertJobManagerResourceDynamicConfig(dynamicConfigsStr);
 
-        assertThat(
-                
MemorySize.parse(configs.get(JobManagerOptions.JVM_HEAP_MEMORY.key())),
-                is(jobManagerProcessSpec.getJvmHeapMemorySize()));
-        assertThat(
-                
MemorySize.parse(configs.get(JobManagerOptions.OFF_HEAP_MEMORY.key())),
-                is(jobManagerProcessSpec.getJvmDirectMemorySize()));
-        assertThat(
-                
MemorySize.parse(configs.get(JobManagerOptions.JVM_METASPACE.key())),
-                is(jobManagerProcessSpec.getJvmMetaspaceSize()));
-        assertThat(
-                
MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MIN.key())),
-                is(jobManagerProcessSpec.getJvmOverheadSize()));
-        assertThat(
-                
MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MAX.key())),
-                is(jobManagerProcessSpec.getJvmOverheadSize()));
+        
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_HEAP_MEMORY.key())))
+                .isEqualTo(jobManagerProcessSpec.getJvmHeapMemorySize());
+        
assertThat(MemorySize.parse(configs.get(JobManagerOptions.OFF_HEAP_MEMORY.key())))
+                .isEqualTo(jobManagerProcessSpec.getJvmDirectMemorySize());
+        
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_METASPACE.key())))
+                .isEqualTo(jobManagerProcessSpec.getJvmMetaspaceSize());
+        
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MIN.key())))
+                .isEqualTo(jobManagerProcessSpec.getJvmOverheadSize());
+        
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MAX.key())))
+                .isEqualTo(jobManagerProcessSpec.getJvmOverheadSize());
     }
 
     private static Map<String, String> 
parseAndAssertJobManagerResourceDynamicConfig(
@@ -100,29 +94,29 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
         Map<String, String> config = new HashMap<>();
         String[] dynamicParameterTokens = dynamicParameterStr.split(" ");
 
-        assertThat(dynamicParameterTokens.length % 2, is(0));
+        assertThat(dynamicParameterTokens.length % 2).isZero();
         for (int i = 0; i < dynamicParameterTokens.length; ++i) {
             String configStr = dynamicParameterTokens[i];
             if (i % 2 == 0) {
-                assertThat(configStr, is("-D"));
+                assertThat(configStr).isEqualTo("-D");
             } else {
                 String[] configEntry = configStr.split("=");
-                assertThat(configEntry, arrayWithSize(2));
+                assertThat(configEntry).hasSize(2);
                 config.put(configEntry[0], configEntry[1]);
             }
         }
 
-        assertThat(config, hasKey(JobManagerOptions.JVM_HEAP_MEMORY.key()));
-        assertThat(config, hasKey(JobManagerOptions.OFF_HEAP_MEMORY.key()));
-        assertThat(config, hasKey(JobManagerOptions.JVM_METASPACE.key()));
-        assertThat(config, hasKey(JobManagerOptions.JVM_OVERHEAD_MIN.key()));
-        assertThat(config, hasKey(JobManagerOptions.JVM_OVERHEAD_MAX.key()));
+        
assertThat(config).containsKey(JobManagerOptions.JVM_HEAP_MEMORY.key());
+        
assertThat(config).containsKey(JobManagerOptions.OFF_HEAP_MEMORY.key());
+        assertThat(config).containsKey(JobManagerOptions.JVM_METASPACE.key());
+        
assertThat(config).containsKey(JobManagerOptions.JVM_OVERHEAD_MIN.key());
+        
assertThat(config).containsKey(JobManagerOptions.JVM_OVERHEAD_MAX.key());
 
         return config;
     }
 
     @Test
-    public void testConfigJvmHeapMemory() {
+    void testConfigJvmHeapMemory() {
         MemorySize jvmHeapSize = MemorySize.parse("50m");
 
         Configuration conf = new Configuration();
@@ -130,30 +124,31 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
         JobManagerProcessSpec jobManagerProcessSpec =
                 JobManagerProcessUtils.processSpecFromConfig(conf);
-        assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(), 
is(jvmHeapSize));
+        
assertThat(jobManagerProcessSpec.getJvmHeapMemorySize()).isEqualTo(jvmHeapSize);
     }
 
     @Test
-    public void testLogFailureOfJvmHeapSizeMinSizeVerification() {
+    void testLogFailureOfJvmHeapSizeMinSizeVerification() {
         MemorySize jvmHeapMemory = MemorySize.parse("50m");
 
         Configuration conf = new Configuration();
         conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
 
         JobManagerProcessUtils.processSpecFromConfig(conf);
-        MatcherAssert.assertThat(
-                testLoggerResource.getMessages(),
-                hasItem(
-                        containsString(
-                                String.format(
-                                        "The configured or derived JVM heap 
memory size (%s) is less than its recommended minimum value (%s)",
-                                        jvmHeapMemory.toHumanReadableString(),
-                                        JobManagerOptions.MIN_JVM_HEAP_SIZE
-                                                .toHumanReadableString()))));
+
+        assertThat(testLoggerResource.getMessages())
+                .anyMatch(
+                        str ->
+                                str.contains(
+                                        String.format(
+                                                "The configured or derived JVM 
heap memory size (%s) is less than its recommended minimum value (%s)",
+                                                
jvmHeapMemory.toHumanReadableString(),
+                                                
JobManagerOptions.MIN_JVM_HEAP_SIZE
+                                                        
.toHumanReadableString())));
     }
 
     @Test
-    public void testConfigOffHeapMemory() {
+    void testConfigOffHeapMemory() {
         MemorySize offHeapMemory = MemorySize.parse("100m");
 
         Configuration conf = new Configuration();
@@ -162,12 +157,12 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
         validateInAllConfigurationsWithoutExplicitTotalFlinkAndJvmHeapMem(
                 conf,
                 jobManagerProcessSpec ->
-                        assertThat(
-                                
jobManagerProcessSpec.getJvmDirectMemorySize(), is(offHeapMemory)));
+                        
assertThat(jobManagerProcessSpec.getJvmDirectMemorySize())
+                                .isEqualTo(offHeapMemory));
     }
 
     @Test
-    public void testFlinkInternalMemorySizeAddUpFailure() {
+    void testFlinkInternalMemorySizeAddUpFailure() {
         MemorySize totalFlinkMemory = MemorySize.parse("199m");
         MemorySize jvmHeap = MemorySize.parse("100m");
         MemorySize offHeapMemory = MemorySize.parse("100m");
@@ -181,7 +176,7 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
     }
 
     @Test
-    public void testJvmHeapExceedsTotalFlinkMemoryFailure() {
+    void testJvmHeapExceedsTotalFlinkMemoryFailure() {
         MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(100);
         MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
 
@@ -193,7 +188,7 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
     }
 
     @Test
-    public void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() {
+    void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() {
         MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
         MemorySize defaultOffHeap = 
JobManagerOptions.OFF_HEAP_MEMORY.defaultValue();
         MemorySize expectedOffHeap = 
MemorySize.ofMebiBytes(100).add(defaultOffHeap);
@@ -205,22 +200,22 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
         JobManagerProcessSpec jobManagerProcessSpec =
                 JobManagerProcessUtils.processSpecFromConfig(conf);
-        assertThat(jobManagerProcessSpec.getJvmDirectMemorySize(), 
is(expectedOffHeap));
-        MatcherAssert.assertThat(
-                testLoggerResource.getMessages(),
-                hasItem(
-                        containsString(
-                                String.format(
-                                        "The Off-Heap Memory size (%s) is 
derived the configured Total Flink Memory size (%s) minus "
-                                                + "the configured JVM Heap 
Memory size (%s). The default Off-Heap Memory size (%s) is ignored.",
-                                        
expectedOffHeap.toHumanReadableString(),
-                                        
totalFlinkMemory.toHumanReadableString(),
-                                        jvmHeap.toHumanReadableString(),
-                                        
defaultOffHeap.toHumanReadableString()))));
+        
assertThat(jobManagerProcessSpec.getJvmDirectMemorySize()).isEqualTo(expectedOffHeap);
+        assertThat(testLoggerResource.getMessages())
+                .anyMatch(
+                        str ->
+                                str.contains(
+                                        String.format(
+                                                "The Off-Heap Memory size (%s) 
is derived the configured Total Flink Memory size (%s) minus "
+                                                        + "the configured JVM 
Heap Memory size (%s). The default Off-Heap Memory size (%s) is ignored.",
+                                                
expectedOffHeap.toHumanReadableString(),
+                                                
totalFlinkMemory.toHumanReadableString(),
+                                                
jvmHeap.toHumanReadableString(),
+                                                
defaultOffHeap.toHumanReadableString())));
     }
 
     @Test
-    public void testDeriveFromRequiredFineGrainedOptions() {
+    void testDeriveFromRequiredFineGrainedOptions() {
         MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
         MemorySize offHeap = MemorySize.ofMebiBytes(50);
         MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(200);
@@ -233,7 +228,7 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
         JobManagerProcessSpec jobManagerProcessSpec =
                 JobManagerProcessUtils.processSpecFromConfig(conf);
-        assertThat(jobManagerProcessSpec.getJvmDirectMemorySize(), 
is(expectedOffHeap));
+        
assertThat(jobManagerProcessSpec.getJvmDirectMemorySize()).isEqualTo(expectedOffHeap);
     }
 
     @Override
@@ -274,17 +269,17 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
     private void validateInConfigWithExplicitJvmHeap(
             Configuration customConfig, Consumer<JobManagerProcessSpec> 
validateFunc) {
-        log.info("Validating in configuration with explicit jvm heap.");
+        LOG.info("Validating in configuration with explicit jvm heap.");
         Configuration config = configWithExplicitJvmHeap();
         config.addAll(customConfig);
         JobManagerProcessSpec jobManagerProcessSpec =
                 JobManagerProcessUtils.processSpecFromConfig(config);
-        assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(), 
is(JVM_HEAP_SIZE));
+        
assertThat(jobManagerProcessSpec.getJvmHeapMemorySize()).isEqualTo(JVM_HEAP_SIZE);
         validateFunc.accept(jobManagerProcessSpec);
     }
 
     private void validateFailInConfigWithExplicitJvmHeap(Configuration 
customConfig) {
-        log.info("Validating failing in configuration with explicit jvm 
heap.");
+        LOG.info("Validating failing in configuration with explicit jvm 
heap.");
         Configuration config = configWithExplicitJvmHeap();
         config.addAll(customConfig);
         validateFail(config);
@@ -292,17 +287,17 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
     private void validateInConfigWithExplicitTotalFlinkMem(
             Configuration customConfig, Consumer<JobManagerProcessSpec> 
validateFunc) {
-        log.info("Validating in configuration with explicit total flink memory 
size.");
+        LOG.info("Validating in configuration with explicit total flink memory 
size.");
         Configuration config = configWithExplicitTotalFlinkMem();
         config.addAll(customConfig);
         JobManagerProcessSpec jobManagerProcessSpec =
                 JobManagerProcessUtils.processSpecFromConfig(config);
-        assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize(), 
is(TOTAL_FLINK_MEM_SIZE));
+        
assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize()).isEqualTo(TOTAL_FLINK_MEM_SIZE);
         validateFunc.accept(jobManagerProcessSpec);
     }
 
     private void validateFailInConfigWithExplicitTotalFlinkMem(Configuration 
customConfig) {
-        log.info("Validating failing in configuration with explicit total 
flink memory size.");
+        LOG.info("Validating failing in configuration with explicit total 
flink memory size.");
         Configuration config = configWithExplicitTotalFlinkMem();
         config.addAll(customConfig);
         validateFail(config);
@@ -310,19 +305,19 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
     private void validateInConfigWithExplicitTotalFlinkAndJvmHeapMem(
             Configuration customConfig, Consumer<JobManagerProcessSpec> 
validateFunc) {
-        log.info("Validating in configuration with explicit total flink and 
jvm heap memory size.");
+        LOG.info("Validating in configuration with explicit total flink and 
jvm heap memory size.");
         Configuration config = configWithExplicitTotalFlinkAndJvmHeapMem();
         config.addAll(customConfig);
         JobManagerProcessSpec jobManagerProcessSpec =
                 JobManagerProcessUtils.processSpecFromConfig(config);
-        assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize(), 
is(TOTAL_FLINK_MEM_SIZE));
-        assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(), 
is(JVM_HEAP_SIZE));
+        
assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize()).isEqualTo(TOTAL_FLINK_MEM_SIZE);
+        
assertThat(jobManagerProcessSpec.getJvmHeapMemorySize()).isEqualTo(JVM_HEAP_SIZE);
         validateFunc.accept(jobManagerProcessSpec);
     }
 
     private void validateFailInConfigWithExplicitTotalFlinkAndJvmHeapMem(
             Configuration customConfig) {
-        log.info(
+        LOG.info(
                 "Validating failing in configuration with explicit total flink 
and jvm heap memory size.");
         Configuration config = configWithExplicitTotalFlinkAndJvmHeapMem();
         config.addAll(customConfig);
@@ -331,17 +326,18 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
     private void validateInConfigWithExplicitTotalProcessMem(
             Configuration customConfig, Consumer<JobManagerProcessSpec> 
validateFunc) {
-        log.info("Validating in configuration with explicit total process 
memory size.");
+        LOG.info("Validating in configuration with explicit total process 
memory size.");
         Configuration config = configWithExplicitTotalProcessMem();
         config.addAll(customConfig);
         JobManagerProcessSpec jobManagerProcessSpec =
                 JobManagerProcessUtils.processSpecFromConfig(config);
-        assertThat(jobManagerProcessSpec.getTotalProcessMemorySize(), 
is(TOTAL_PROCESS_MEM_SIZE));
+        assertThat(jobManagerProcessSpec.getTotalProcessMemorySize())
+                .isEqualTo(TOTAL_PROCESS_MEM_SIZE);
         validateFunc.accept(jobManagerProcessSpec);
     }
 
     private void validateFailInConfigWithExplicitTotalProcessMem(Configuration 
customConfig) {
-        log.info("Validating failing in configuration with explicit total 
process memory size.");
+        LOG.info("Validating failing in configuration with explicit total 
process memory size.");
         Configuration config = configWithExplicitTotalProcessMem();
         config.addAll(customConfig);
         validateFail(config);
@@ -349,12 +345,9 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
 
     @Override
     protected void validateFail(Configuration config) {
-        try {
-            JobManagerProcessUtils.processSpecFromConfig(config);
-            fail("Configuration did not fail as expected.");
-        } catch (IllegalConfigurationException e) {
-            // expected
-        }
+        assertThatExceptionOfType(IllegalConfigurationException.class)
+                .as("Configuration did not fail as expected.")
+                .isThrownBy(() -> 
JobManagerProcessUtils.processSpecFromConfig(config));
     }
 
     @Override


Reply via email to