RyanSkraba commented on a change in pull request #18928:
URL: https://github.com/apache/flink/pull/18928#discussion_r826191132



##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -26,14 +26,15 @@
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.Serializable;
 
-import static junit.framework.TestCase.fail;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;

Review comment:
       ```suggestion
   import static org.assertj.core.api.Assertions.fail;
   ```
   These isn't really important, just consistency with using assertions only 
from AssertJ.  (There's a bunch of these in the PR)

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendITCase.java
##########
@@ -23,33 +23,32 @@
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.time.Duration;
 import java.util.Collections;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Integration tests for {@link CliFrontend}. */
-public class CliFrontendITCase {
+class CliFrontendITCase {
 
     private PrintStream originalPrintStream;
 
     private ByteArrayOutputStream testOutputStream;
 
-    @Before
-    public void before() {
+    @BeforeEach
+    void before() {
         originalPrintStream = System.out;
         testOutputStream = new ByteArrayOutputStream();
         System.setOut(new PrintStream(testOutputStream));
     }
 
-    @After
+    @AfterEach
     public void finalize() {

Review comment:
       ```suggestion
       void tearDown() {
   ```
   `finalize()` has a specific meaning in Java.  It looks like `tearDown` is 
often used.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java
##########
@@ -67,14 +78,14 @@ public void testShowExecutionPlan() throws Exception {
             CliFrontend testFrontend =
                     new CliFrontend(configuration, 
Collections.singletonList(getCli()));
             testFrontend.info(parameters);
-            assertTrue(buffer.toString().contains("\"parallelism\" : 4"));
+            assertThat(buffer.toString().contains("\"parallelism\" : 
4")).isTrue();

Review comment:
       ```suggestion
               assertThat(buffer.toString()).contains("\"parallelism\" : 4");
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -84,7 +86,7 @@ public void 
testWaitUntilJobInitializationFinished_doesNotThrowRuntimeException(
 
     /** Ensure that other errors are thrown. */
     @Test
-    public void testWaitUntilJobInitializationFinished_throwsOtherErrors() {
+    void testWaitUntilJobInitializationFinished_throwsOtherErrors() {
         CommonTestUtils.assertThrows(

Review comment:
       Throughout this migration -- what do  you think about dropping 
`CommonTestUtils.assertThrows` (maybe deprecating it?)   It doesn't add any 
value over AssertJ `assertThatThrownBy`, and this seems like the right time to 
do it.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -118,9 +125,10 @@ public void testTriggerSavepointFailure() throws Exception 
{
 
                 fail("Savepoint should have failed.");

Review comment:
       I think we could simplify this logic using AssertJ and it's exception 
handling.  Instead of a double nested try, it could be replaced with:
   
   ```
           try (ClusterClient<String> clusterClient = 
createFailingClusterClient(testException)) {
               MockedCliFrontend frontend = new 
MockedCliFrontend(clusterClient);
   
               String[] parameters = {jobId.toString()};
   
               assertThatThrownBy(() -> frontend.savepoint(parameters))
                       .isInstanceOf(FlinkException.class)
                       .hasRootCause(testException);
           } finally {
               restoreStdOutAndStdErr();
           }
   ```
   

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -139,12 +147,13 @@ public void testTriggerSavepointFailureIllegalJobID() 
throws Exception {
                                     getConfiguration(), 
StandaloneClusterId.getInstance()));
 
             String[] parameters = {"invalid job id"};
-            try {
-                frontend.savepoint(parameters);
-                fail("Should have failed.");
-            } catch (CliArgsException e) {
-                assertThat(e.getMessage(), Matchers.containsString("Cannot 
parse JobID"));
-            }
+            assertThatThrownBy(
+                            () -> {
+                                frontend.savepoint(parameters);
+                                fail("Should have failed.");

Review comment:
       This `fail` line can be deleted now.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
##########
@@ -28,34 +28,42 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for the STOP command. */
-public class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
+class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
 
-    @BeforeClass
-    public static void setup() {
+    @BeforeAll
+    static void setup() {
         CliFrontendTestUtils.pipeSystemOutToNull();
     }
 
-    @AfterClass
-    public static void shutdown() {
+    @AfterAll
+    static void shutdown() {
         CliFrontendTestUtils.restoreSystemOut();
     }
 
+    private static Stream<Arguments> testArguments() {

Review comment:
       As above, consider using a `@CsvSource` on the test that uses these.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
##########
@@ -59,7 +54,7 @@ public void testStandaloneClusterClientFactoryDiscovery() {
 
         ClusterClientFactory<StandaloneClusterId> factory =
                 serviceLoaderUnderTest.getClusterClientFactory(config);
-        assertTrue(factory instanceof StandaloneClientFactory);
+        assertThat(factory instanceof StandaloneClientFactory).isTrue();

Review comment:
       ```suggestion
           assertThat(factory).isInstanceOf(StandaloneClientFactory.class);
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProviderTest.java
##########
@@ -18,161 +18,195 @@
 
 package org.apache.flink.client.deployment.application;
 
-import org.apache.flink.client.testjar.ClasspathProvider;
+import org.apache.flink.client.testjar.ClasspathProviderExtension;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.commons.io.FilenameUtils;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
-import org.hamcrest.core.IsCollectionContaining;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Collection;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * {@code FromClasspathEntryClassInformationProviderTest} tests {@link
  * FromClasspathEntryClassInformationProvider}.
  */
-public class FromClasspathEntryClassInformationProviderTest extends TestLogger 
{
+@ExtendWith(TestLoggerExtension.class)
+class FromClasspathEntryClassInformationProviderTest {
 
-    @Rule
-    public ClasspathProvider noEntryClassClasspathProvider =
-            ClasspathProvider.createWithNoEntryClass();
+    @RegisterExtension
+    ClasspathProviderExtension noEntryClassClasspathProvider =
+            ClasspathProviderExtension.createWithNoEntryClass();
 
-    @Rule
-    public ClasspathProvider singleEntryClassClasspathProvider =
-            ClasspathProvider.createWithSingleEntryClass();
+    @RegisterExtension
+    ClasspathProviderExtension singleEntryClassClasspathProvider =
+            ClasspathProviderExtension.createWithSingleEntryClass();
 
-    @Rule
-    public ClasspathProvider multipleEntryClassesClasspathProvider =
-            ClasspathProvider.createWithMultipleEntryClasses();
+    @RegisterExtension
+    ClasspathProviderExtension multipleEntryClassesClasspathProvider =
+            ClasspathProviderExtension.createWithMultipleEntryClasses();
 
-    @Rule
-    public ClasspathProvider testJobEntryClassClasspathProvider =
-            ClasspathProvider.createWithTestJobOnly();
+    @RegisterExtension
+    ClasspathProviderExtension testJobEntryClassClasspathProvider =
+            ClasspathProviderExtension.createWithTestJobOnly();
 
-    @Rule
-    public ClasspathProvider onlyTextFileClasspathProvider =
-            ClasspathProvider.createWithTextFileOnly();
+    @RegisterExtension
+    ClasspathProviderExtension onlyTextFileClasspathProvider =
+            ClasspathProviderExtension.createWithTextFileOnly();
 
     @Test
-    public void testJobClassOnUserClasspathWithExplicitJobClassName()
-            throws IOException, FlinkException {
+    void testJobClassOnUserClasspathWithExplicitJobClassName() throws 
IOException, FlinkException {
         FromClasspathEntryClassInformationProvider testInstance =
                 FromClasspathEntryClassInformationProvider.create(
                         singleEntryClassClasspathProvider.getJobClassName(),
                         
singleEntryClassClasspathProvider.getURLUserClasspath());
 
-        assertThat(testInstance.getJobClassName().isPresent(), is(true));
-        assertThat(
-                testInstance.getJobClassName().get(),
-                is(singleEntryClassClasspathProvider.getJobClassName()));
-        assertThat(testInstance.getJarFile().isPresent(), is(false));
+        assertThat(testInstance.getJobClassName().isPresent()).isTrue();
+        assertThat(testInstance.getJobClassName().get())
+                
.isEqualTo(singleEntryClassClasspathProvider.getJobClassName());
+        assertThat(testInstance.getJarFile().isPresent()).isFalse();
     }
 
-    @Test(expected = FlinkException.class)
-    public void testJobClassOnUserClasspathWithOnlyTestFileOnClasspath()
+    @Test
+    void testJobClassOnUserClasspathWithOnlyTestFileOnClasspath()
             throws IOException, FlinkException {
-        // we want to check that the right exception is thrown if the user 
classpath is empty
-        FromClasspathEntryClassInformationProvider.create(
-                "SomeJobClassName", 
onlyTextFileClasspathProvider.getURLUserClasspath());
+        assertThatThrownBy(
+                        () -> {
+                            // we want to check that the right exception is 
thrown if the user
+                            // classpath is empty
+                            FromClasspathEntryClassInformationProvider.create(
+                                    "SomeJobClassName",
+                                    
onlyTextFileClasspathProvider.getURLUserClasspath());
+                        })
+                .isInstanceOf(FlinkException.class);
     }
 
-    @Test(expected = NullPointerException.class)
-    public void testJobClassOnUserClasspathWithMissingJobClassName()
-            throws IOException, FlinkException {
-        FromClasspathEntryClassInformationProvider.create(
-                null, singleEntryClassClasspathProvider.getURLUserClasspath());
+    @Test
+    void testJobClassOnUserClasspathWithMissingJobClassName() throws 
IOException, FlinkException {
+        assertThatThrownBy(
+                        () -> {
+                            FromClasspathEntryClassInformationProvider.create(
+                                    null, 
singleEntryClassClasspathProvider.getURLUserClasspath());
+                        })
+                .isInstanceOf(NullPointerException.class);
     }
 
-    @Test(expected = NullPointerException.class)
-    public void testJobClassOnUserClasspathWithMissingUserClasspath()
-            throws IOException, FlinkException {
-        FromClasspathEntryClassInformationProvider.create("jobClassName", 
null);
+    @Test
+    void testJobClassOnUserClasspathWithMissingUserClasspath() throws 
IOException, FlinkException {
+        assertThatThrownBy(
+                        () -> {
+                            
FromClasspathEntryClassInformationProvider.create("jobClassName", null);
+                        })
+                .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    public void testJobClassOnUserClasspathWithoutExplicitJobClassName()
+    void testJobClassOnUserClasspathWithoutExplicitJobClassName()
             throws IOException, FlinkException {
         FromClasspathEntryClassInformationProvider testInstance =
                 FromClasspathEntryClassInformationProvider.createFromClasspath(
                         
singleEntryClassClasspathProvider.getURLUserClasspath());
 
-        assertThat(testInstance.getJobClassName().isPresent(), is(true));
-        assertThat(
-                testInstance.getJobClassName().get(),
-                is(singleEntryClassClasspathProvider.getJobClassName()));
-        assertThat(testInstance.getJarFile().isPresent(), is(false));
+        assertThat(testInstance.getJobClassName().isPresent()).isTrue();
+        assertThat(testInstance.getJobClassName().get())
+                
.isEqualTo(singleEntryClassClasspathProvider.getJobClassName());
+        assertThat(testInstance.getJarFile().isPresent()).isFalse();
     }
 
-    @Test(expected = FlinkException.class)
-    public void testMissingJobClassOnUserClasspathWithoutExplicitJobClassName()
+    @Test
+    void testMissingJobClassOnUserClasspathWithoutExplicitJobClassName()
             throws IOException, FlinkException {
-        FromClasspathEntryClassInformationProvider.createFromClasspath(
-                noEntryClassClasspathProvider.getURLUserClasspath());
+        assertThatThrownBy(
+                        () -> {
+                            
FromClasspathEntryClassInformationProvider.createFromClasspath(
+                                    
noEntryClassClasspathProvider.getURLUserClasspath());
+                        })
+                .isInstanceOf(FlinkException.class);
     }
 
-    @Test(expected = FlinkException.class)
-    public void testTooManyMainMethodsOnUserClasspath() throws IOException, 
FlinkException {
-        FromClasspathEntryClassInformationProvider.createFromClasspath(
-                multipleEntryClassesClasspathProvider.getURLUserClasspath());
+    @Test
+    void testTooManyMainMethodsOnUserClasspath() throws IOException, 
FlinkException {
+        assertThatThrownBy(
+                        () -> {
+                            
FromClasspathEntryClassInformationProvider.createFromClasspath(
+                                    
multipleEntryClassesClasspathProvider.getURLUserClasspath());
+                        })
+                .isInstanceOf(FlinkException.class);
     }
 
-    @Test(expected = NullPointerException.class)
-    public void 
testJobClassOnUserClasspathWithoutExplicitJobClassNameAndMissingUserClasspath()
+    @Test
+    void 
testJobClassOnUserClasspathWithoutExplicitJobClassNameAndMissingUserClasspath()
             throws IOException, FlinkException {
-        FromClasspathEntryClassInformationProvider.createFromClasspath(null);
+        assertThatThrownBy(
+                        () -> {
+                            
FromClasspathEntryClassInformationProvider.createFromClasspath(null);
+                        })
+                .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    public void testJobClassNameFromSystemClasspath() throws IOException, 
FlinkException {
+    void testJobClassNameFromSystemClasspath() throws IOException, 
FlinkException {
         singleEntryClassClasspathProvider.setSystemClasspath();
         FromClasspathEntryClassInformationProvider testInstance =
                 
FromClasspathEntryClassInformationProvider.createFromSystemClasspath();
-        assertThat(testInstance.getJobClassName().isPresent(), is(true));
-        assertThat(
-                testInstance.getJobClassName().get(),
-                is(singleEntryClassClasspathProvider.getJobClassName()));
-        assertThat(testInstance.getJarFile().isPresent(), is(false));
+        assertThat(testInstance.getJobClassName().isPresent()).isTrue();
+        assertThat(testInstance.getJobClassName().get())
+                
.isEqualTo(singleEntryClassClasspathProvider.getJobClassName());
+        assertThat(testInstance.getJarFile().isPresent()).isFalse();
     }
 
-    @Test(expected = FlinkException.class)
-    public void testMissingJobClassNameFromSystemClasspath() throws 
IOException, FlinkException {
-        noEntryClassClasspathProvider.setSystemClasspath();
-        FromClasspathEntryClassInformationProvider.createFromSystemClasspath();
+    @Test
+    void testMissingJobClassNameFromSystemClasspath() throws IOException, 
FlinkException {
+        assertThatThrownBy(
+                        () -> {
+                            noEntryClassClasspathProvider.setSystemClasspath();
+                            
FromClasspathEntryClassInformationProvider.createFromSystemClasspath();
+                        })
+                .isInstanceOf(FlinkException.class);
     }
 
-    @Test(expected = FlinkException.class)
-    public void testTooManyMainMethodsOnSystemClasspath() throws IOException, 
FlinkException {
-        multipleEntryClassesClasspathProvider.setSystemClasspath();
-        FromClasspathEntryClassInformationProvider.createFromSystemClasspath();
+    @Test
+    void testTooManyMainMethodsOnSystemClasspath() throws IOException, 
FlinkException {
+        assertThatThrownBy(
+                        () -> {
+                            
multipleEntryClassesClasspathProvider.setSystemClasspath();
+                            
FromClasspathEntryClassInformationProvider.createFromSystemClasspath();
+                        })
+                .isInstanceOf(FlinkException.class);
     }
 
     @Test
-    public void testJarFromSystemClasspathSanityCheck() {
+    void testJarFromSystemClasspathSanityCheck() {
         // Junit executes this test, so it should be returned as part of JARs 
on the classpath
         final Iterable<File> systemClasspath =
                 
FromClasspathEntryClassInformationProvider.extractSystemClasspath();
         assertThat(
-                StreamSupport.stream(systemClasspath.spliterator(), false)
-                        .map(File::getName)
-                        .collect(Collectors.toList()),
-                
IsCollectionContaining.hasItem(CoreMatchers.containsString("junit")));
+                        StreamSupport.stream(systemClasspath.spliterator(), 
false)
+                                .map(File::getName)
+                                .collect(Collectors.toList()))

Review comment:
       You don't need to collect the stream here, you can `assertThat` on a 
Stream<String>

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java
##########
@@ -84,7 +95,7 @@ public void testShowExecutionPlanWithParallelism() {
             CliFrontend testFrontend =
                     new CliFrontend(configuration, 
Collections.singletonList(getCli()));
             testFrontend.info(parameters);
-            assertTrue(buffer.toString().contains("\"parallelism\" : 17"));
+            assertThat(buffer.toString().contains("\"parallelism\" : 
17")).isTrue();

Review comment:
       ```suggestion
               assertThat(buffer.toString()).contains("\"parallelism\" : 17");
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
##########
@@ -25,50 +25,49 @@
 import org.apache.flink.util.ChildFirstClassLoader;
 
 import org.apache.commons.cli.Options;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Collections;
 
 import static 
org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
 import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the RUN command with Dynamic Properties. */
-public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
+class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
 
     private GenericCLI cliUnderTest;
     private Configuration configuration;
 
-    @Rule public TemporaryFolder tmp = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmp;

Review comment:
       `tmp` can be removed here, and passed as a parameter to the setup code: 
`void setup (@TempDir java.nio.file.Path tmp)`.  I usually find this to be a 
bit clearer about the lifecycle of the temporary directory.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java
##########
@@ -20,42 +20,53 @@
 
 import org.apache.flink.configuration.Configuration;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.Collections;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for the "info" command. */
-public class CliFrontendInfoTest extends CliFrontendTestBase {
+class CliFrontendInfoTest extends CliFrontendTestBase {
 
     private static PrintStream stdOut;
     private static PrintStream capture;
     private static ByteArrayOutputStream buffer;
 
-    @Test(expected = CliArgsException.class)
-    public void testMissingOption() throws Exception {
-        String[] parameters = {};
-        Configuration configuration = getConfiguration();
-        CliFrontend testFrontend =
-                new CliFrontend(configuration, 
Collections.singletonList(getCli()));
-        testFrontend.cancel(parameters);
+    @Test
+    void testMissingOption() throws Exception {
+        assertThatThrownBy(
+                        () -> {
+                            String[] parameters = {};
+                            Configuration configuration = getConfiguration();
+                            CliFrontend testFrontend =
+                                    new CliFrontend(
+                                            configuration, 
Collections.singletonList(getCli()));
+                            testFrontend.cancel(parameters);

Review comment:
       ```suggestion
                               testFrontend.info(parameters);
   ```
   This might actually be a copy-paste bug in the original test case!

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
##########
@@ -101,24 +103,24 @@
     private static final String FAIL_MESSAGE =

Review comment:
       This looks like it is only used in unnecessary `fail(...)` calls at the 
end of `assertThatThrownBy` tests.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
##########
@@ -45,37 +45,37 @@
 import static 
org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
 import static 
org.apache.flink.client.cli.CliFrontendTestUtils.getNonJarFilePath;
 import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /** Tests for the RUN command with {@link PackagedProgram PackagedPrograms}. */
-public class CliFrontendPackageProgramTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class CliFrontendPackageProgramTest {
 
     private CliFrontend frontend;
 
-    @BeforeClass
-    public static void init() {
+    @BeforeAll
+    static void init() {
         CliFrontendTestUtils.pipeSystemOutToNull();
     }
 
-    @AfterClass
-    public static void shutdown() {
+    @AfterAll
+    static void shutdown() {
         CliFrontendTestUtils.restoreSystemOut();
     }
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    void setup() throws Exception {
         final Configuration configuration = new Configuration();
         frontend = new CliFrontend(configuration, 
Collections.singletonList(new DefaultCLI()));
     }
 
     @Test
-    public void testNonExistingJarFile() throws Exception {
+    void testNonExistingJarFile() throws Exception {
         ProgramOptions programOptions = mock(ProgramOptions.class);
         
when(programOptions.getJarFilePath()).thenReturn("/some/none/existing/path");
 

Review comment:
       A couple of these tests should be an `assertThatThrownBy` test.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TempDirUtils.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.UUID;
+
+/**
+ * Util class for implementing common functions in junit4 {@link 
org.junit.rules.TemporaryFolder},
+ * like newFolder() and newFile() for junit5 {@link 
org.junit.jupiter.api.io.TempDir}.
+ */
+public class TempDirUtils {
+
+    private TempDirUtils() {}
+
+    public static Path newFolderIn(@Nonnull Path baseTempPath) {
+        Path newFolder = baseTempPath.resolve(UUID.randomUUID().toString());

Review comment:
       Reading through most of the test cases, it seems that most of these UUID 
folders and files could just be given an explicit, static name (because the 
baseTempPath is already unique). I have specifically found this to be useful 
when stepping through and debugging tests.
   
   Alternatively, we should probably just be using a second `@TempDir`

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
##########
@@ -123,41 +123,38 @@ public void testRun() throws Exception {
                     CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, 
parameters, true);
             ProgramOptions programOptions = ProgramOptions.create(commandLine);
 
-            assertEquals("-arg1", programOptions.getProgramArgs()[0]);
-            assertEquals("value1", programOptions.getProgramArgs()[1]);
-            assertEquals("justavalue", programOptions.getProgramArgs()[2]);
-            assertEquals("--arg2", programOptions.getProgramArgs()[3]);
-            assertEquals("value2", programOptions.getProgramArgs()[4]);
+            assertThat(programOptions.getProgramArgs())

Review comment:
       :+1: 

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendInfoTest.java
##########
@@ -20,42 +20,53 @@
 
 import org.apache.flink.configuration.Configuration;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.Collections;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for the "info" command. */
-public class CliFrontendInfoTest extends CliFrontendTestBase {
+class CliFrontendInfoTest extends CliFrontendTestBase {
 
     private static PrintStream stdOut;
     private static PrintStream capture;
     private static ByteArrayOutputStream buffer;
 
-    @Test(expected = CliArgsException.class)
-    public void testMissingOption() throws Exception {
-        String[] parameters = {};
-        Configuration configuration = getConfiguration();
-        CliFrontend testFrontend =
-                new CliFrontend(configuration, 
Collections.singletonList(getCli()));
-        testFrontend.cancel(parameters);
+    @Test
+    void testMissingOption() throws Exception {
+        assertThatThrownBy(
+                        () -> {
+                            String[] parameters = {};
+                            Configuration configuration = getConfiguration();
+                            CliFrontend testFrontend =
+                                    new CliFrontend(
+                                            configuration, 
Collections.singletonList(getCli()));
+                            testFrontend.cancel(parameters);
+                        })
+                .isInstanceOf(CliArgsException.class);
     }
 
-    @Test(expected = CliArgsException.class)
-    public void testUnrecognizedOption() throws Exception {
-        String[] parameters = {"-v", "-l"};
-        Configuration configuration = getConfiguration();
-        CliFrontend testFrontend =
-                new CliFrontend(configuration, 
Collections.singletonList(getCli()));
-        testFrontend.cancel(parameters);
+    @Test
+    void testUnrecognizedOption() throws Exception {
+        assertThatThrownBy(
+                        () -> {
+                            String[] parameters = {"-v", "-l"};
+                            Configuration configuration = getConfiguration();
+                            CliFrontend testFrontend =
+                                    new CliFrontend(
+                                            configuration, 
Collections.singletonList(getCli()));
+                            testFrontend.cancel(parameters);

Review comment:
       ```suggestion
                               testFrontend.info(parameters);
   ```
   idem

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java
##########
@@ -101,81 +100,93 @@ public void 
testFindEntryClassAssemblerClassAndMainClass() throws IOException {
 
         Optional<String> entry = JarManifestParser.findEntryClass(jarFile);
 
-        assertTrue(entry.isPresent());
-        assertThat(entry.get(), is(equalTo("AssemblerClass")));
+        assertThat(entry.isPresent()).isTrue();
+        assertThat(entry.get()).isEqualTo("AssemblerClass");
     }
 
     @Test
-    public void testFindEntryClassWithTestJobJar() throws IOException {
+    void testFindEntryClassWithTestJobJar() throws IOException {
         File jarFile = TestJob.getTestJobJar();
 
         Optional<String> entryClass = 
JarManifestParser.findEntryClass(jarFile);
 
-        assertTrue(entryClass.isPresent());
-        assertThat(entryClass.get(), 
is(equalTo(TestJob.class.getCanonicalName())));
+        assertThat(entryClass.isPresent()).isTrue();
+        
assertThat(entryClass.get()).isEqualTo(TestJob.class.getCanonicalName());
     }
 
-    @Test(expected = NoSuchElementException.class)
-    public void testFindOnlyEntryClassEmptyArgument() throws IOException {
-        JarManifestParser.findOnlyEntryClass(Collections.emptyList());
+    @Test
+    void testFindOnlyEntryClassEmptyArgument() throws IOException {
+        assertThatThrownBy(
+                        () -> {
+                            
JarManifestParser.findOnlyEntryClass(Collections.emptyList());
+                        })
+                .isInstanceOf(NoSuchElementException.class);
     }
 
-    @Test(expected = NoSuchElementException.class)
-    public void testFindOnlyEntryClassSingleJarWithNoManifest() throws 
IOException {
-        File jarWithNoManifest = createJarFileWithManifest(ImmutableMap.of());
-        
JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarWithNoManifest));
+    @Test
+    void testFindOnlyEntryClassSingleJarWithNoManifest() throws IOException {
+        assertThatThrownBy(
+                        () -> {
+                            File jarWithNoManifest = 
createJarFileWithManifest(ImmutableMap.of());
+                            JarManifestParser.findOnlyEntryClass(
+                                    ImmutableList.of(jarWithNoManifest));
+                        })
+                .isInstanceOf(NoSuchElementException.class);
     }
 
     @Test
-    public void testFindOnlyEntryClassSingleJar() throws IOException {
+    void testFindOnlyEntryClassSingleJar() throws IOException {
         File jarFile = TestJob.getTestJobJar();
 
         JarManifestParser.JarFileWithEntryClass jarFileWithEntryClass =
                 
JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarFile));
 
-        assertThat(
-                jarFileWithEntryClass.getEntryClass(),
-                is(equalTo(TestJob.class.getCanonicalName())));
+        assertThat(jarFileWithEntryClass.getEntryClass())
+                .isEqualTo(TestJob.class.getCanonicalName());
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void 
testFindOnlyEntryClassMultipleJarsWithMultipleManifestEntries() throws 
IOException {
-        File jarFile = TestJob.getTestJobJar();
-
-        JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarFile, 
jarFile, jarFile));
+    @Test
+    void testFindOnlyEntryClassMultipleJarsWithMultipleManifestEntries() 
throws IOException {
+        assertThatThrownBy(
+                        () -> {
+                            File jarFile = TestJob.getTestJobJar();
+                            JarManifestParser.findOnlyEntryClass(
+                                    ImmutableList.of(jarFile, jarFile, 
jarFile));
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testFindOnlyEntryClassMultipleJarsWithSingleManifestEntry() 
throws IOException {
+    void testFindOnlyEntryClassMultipleJarsWithSingleManifestEntry() throws 
IOException {
         File jarWithNoManifest = createJarFileWithManifest(ImmutableMap.of());
         File jarFile = TestJob.getTestJobJar();
 
         JarManifestParser.JarFileWithEntryClass jarFileWithEntryClass =
                 
JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarWithNoManifest, 
jarFile));
 
-        assertThat(
-                jarFileWithEntryClass.getEntryClass(),
-                is(equalTo(TestJob.class.getCanonicalName())));
+        assertThat(jarFileWithEntryClass.getEntryClass())
+                .isEqualTo(TestJob.class.getCanonicalName());
     }
 
     @Test
-    public void testFindFirstManifestAttributeWithNoAttribute() throws 
IOException {
+    void testFindFirstManifestAttributeWithNoAttribute() throws IOException {
         assertThat(
-                
JarManifestParser.findFirstManifestAttribute(TestJob.getTestJobJar()).isPresent(),
-                is(false));
+                        
JarManifestParser.findFirstManifestAttribute(TestJob.getTestJobJar())
+                                .isPresent())
+                .isFalse();
     }
 
     @Test
-    public void testFindFirstManifestAttributeWithAttributes() throws 
IOException {
+    void testFindFirstManifestAttributeWithAttributes() throws IOException {
         Optional<String> optionalValue =
                 JarManifestParser.findFirstManifestAttribute(
                         TestJob.getTestJobJar(), 
PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
-        assertThat(optionalValue.isPresent(), is(true));
-        assertThat(optionalValue.get(), is(TestJobInfo.TEST_JAR_JOB_CLASS));
+        assertThat(optionalValue.isPresent()).isTrue();
+        
assertThat(optionalValue.get()).isEqualTo(TestJobInfo.TEST_JAR_JOB_CLASS);
     }
 
     private File createJarFileWithManifest(Map<String, String> manifest) 
throws IOException {
-        final File jarFile = temporaryFolder.newFile();
+        final File jarFile = newFileIn(temporaryFolder, "test.jar").toFile();

Review comment:
       ```suggestion
           final File jarFile = temporaryFolder.resolve("test.jar").toFile();
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
##########
@@ -340,21 +347,21 @@ public void testGetExecutionPlan() throws 
ProgramInvocationException {
                         PackagedProgramUtils.getPipelineFromProgram(
                                 prg, new Configuration(), 1, true);
         OptimizedPlan op = optimizer.compile(plan);
-        assertNotNull(op);
+        assertThat(op).isNotNull();
 
         PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
-        assertNotNull(dumper.getOptimizerPlanAsJSON(op));
+        assertThat(dumper.getOptimizerPlanAsJSON(op)).isNotNull();
 
         // test HTML escaping
         PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
         dumper2.setEncodeForHTML(true);
         String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
 
-        assertEquals(-1, htmlEscaped.indexOf('\\'));
+        assertThat(htmlEscaped.indexOf('\\')).isEqualTo(-1);

Review comment:
       ```suggestion
           assertThat(htmlEscaped).doesNotContain("\\");
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TempDirUtils.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.UUID;
+
+/**
+ * Util class for implementing common functions in junit4 {@link 
org.junit.rules.TemporaryFolder},
+ * like newFolder() and newFile() for junit5 {@link 
org.junit.jupiter.api.io.TempDir}.
+ */
+public class TempDirUtils {

Review comment:
       Some odd whitespace here!
   
   In general, I'm tempted to say that these functions shouldn't be factored 
out into a Utils.  Idiomatically, using a `@TempDir` parameter in a JUnit5 test 
is pretty straightforward, and creating new Files/Paths where they are used is 
more explicit and readable.
   
   (Just an opinion, however.)

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
##########
@@ -107,13 +101,13 @@ public void 
testDeriveEntryClassInformationFromSystemClasspathWithNonExistingJob
         final EntryClassInformationProvider informationProvider =
                 
DefaultPackagedProgramRetriever.createEntryClassInformationProvider(
                         null, null, jobClassName, new String[0]);
-        assertThat(informationProvider.getJobClassName().isPresent(), 
is(true));
-        assertThat(informationProvider.getJobClassName().get(), 
is(jobClassName));
-        assertThat(informationProvider.getJarFile().isPresent(), is(false));
+        assertThat(informationProvider.getJobClassName().isPresent()).isTrue();
+        
assertThat(informationProvider.getJobClassName().get()).isEqualTo(jobClassName);
+        assertThat(informationProvider.getJarFile().isPresent()).isFalse();

Review comment:
       ```suggestion
           
assertThat(informationProvider.getJobClassName()).get().isEqualTo(jobClassName);
           assertThat(informationProvider.getJarFile()).isEmpty();    }
   ```
   Throughout the class we can use AssertJ Optional assertions.

##########
File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
##########
@@ -43,22 +44,22 @@
  *
  * @see org.apache.flink.client.cli.CliFrontendRunTest
  */
-public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
+class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 
-    @Rule public TemporaryFolder tmp = new TemporaryFolder();
+    @TempDir public File tmp;

Review comment:
       You could delete this here and use it as an argument in the `void 
testRun(@TempDir File tmp)`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
##########
@@ -94,7 +94,7 @@ public UnmodifiableConfiguration getClientConfiguration() {
         return restClusterClientConfig;
     }
 
-    public URI getRestAddres() {
+    public URI getRestAddress() {

Review comment:
       Thanks!  This has been bugging me a bit :D 

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
##########
@@ -434,63 +442,59 @@ public void 
testRetrieveCorrectUserClasspathsWithPipelineClasspaths() throws Exc
                 DefaultPackagedProgramRetriever.create(
                         null,
                         singleEntryClassClasspathProvider.getJobClassName(),
-                        ClasspathProvider.parametersForTestJob("suffix"),
+                        
ClasspathProviderExtension.parametersForTestJob("suffix"),
                         configuration);
         final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
-        assertThat(jobGraph.getClasspaths(), 
containsInAnyOrder(expectedMergedURLs.toArray()));
+        assertThat(jobGraph.getClasspaths()).isEqualTo(expectedMergedURLs);

Review comment:
       Is it OK to require the order of expectedMergedURLs?  Obviously the 
tests pass, so it's probably fine.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -271,26 +272,26 @@ public void testDisposeWithJar() throws Exception {
             CliFrontend frontend = new MockedCliFrontend(clusterClient);
 
             // Fake JAR file
-            File f = tmp.newFile();
+            File f = newFileIn(tmp, "test.jar").toFile();

Review comment:
       ```suggestion
               File f = tmp.resolve("test.jar").toFile();
   ```
   This doesn't need to be pre-created, it's set up as an empty zip in the next 
line.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -31,47 +31,55 @@
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import java.util.zip.ZipOutputStream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.util.TempDirUtils.newFileIn;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the SAVEPOINT command. */
-public class CliFrontendSavepointTest extends CliFrontendTestBase {
+class CliFrontendSavepointTest extends CliFrontendTestBase {
 
     private static PrintStream stdOut;
     private static PrintStream stdErr;
     private static ByteArrayOutputStream buffer;
 
-    @Rule public TemporaryFolder tmp = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmp;
+
+    private static Stream<Arguments> testArguments() {

Review comment:
       (See below for a more readable source suggestion)

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
##########
@@ -280,87 +272,108 @@ public void testSavepointRestoreSettings()
                 DefaultPackagedProgramRetriever.create(
                         null,
                         testJobEntryClassClasspathProvider.getJobClassName(),
-                        ClasspathProvider.parametersForTestJob(expectedSuffix),
+                        
ClasspathProviderExtension.parametersForTestJob(expectedSuffix),
                         new Configuration());
 
         final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, 
configuration);
 
-        assertThat(jobGraph.getSavepointRestoreSettings(), 
is(savepointRestoreSettings));
-        assertThat(jobGraph.getJobID(), is(jobId));
+        
assertThat(jobGraph.getSavepointRestoreSettings()).isEqualTo(savepointRestoreSettings);
+        assertThat(jobGraph.getJobID()).isEqualTo(jobId);
     }
 
     @Test
-    public void testFailIfJobDirDoesNotHaveEntryClass() {
-        try {
-            DefaultPackagedProgramRetriever.create(
-                    noEntryClassClasspathProvider.getDirectory(),
-                    testJobEntryClassClasspathProvider.getJobClassName(),
-                    ClasspathProvider.parametersForTestJob("suffix"),
-                    new Configuration());
-            fail("This case should throw exception !");
-        } catch (FlinkException e) {
-            assertThat(
-                    e,
-                    FlinkMatchers.containsMessage(
-                            String.format(
-                                    "Could not find the provided job class 
(%s) in the user lib directory.",
-                                    
testJobEntryClassClasspathProvider.getJobClassName())));
-        }
+    void testFailIfJobDirDoesNotHaveEntryClass() {
+        assertThatThrownBy(
+                        () -> {
+                            DefaultPackagedProgramRetriever.create(
+                                    
noEntryClassClasspathProvider.getDirectory(),
+                                    
testJobEntryClassClasspathProvider.getJobClassName(),
+                                    
ClasspathProviderExtension.parametersForTestJob("suffix"),
+                                    new Configuration());
+                            fail("This case should throw exception !");

Review comment:
       Unnecessary `fail`

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -174,25 +183,17 @@ public void testTriggerSavepointCustomTarget() throws 
Exception {
                     .triggerSavepoint(
                             eq(jobId), eq(savepointDirectory), 
eq(SavepointFormatType.DEFAULT));
 
-            assertTrue(buffer.toString().contains(savepointDirectory));
+            assertThat(buffer.toString()).contains(savepointDirectory);
         } finally {
             clusterClient.close();
 
             restoreStdOutAndStdErr();
         }
     }
 
-    @Test
-    public void testTriggerSavepointCustomFormatShortOption() throws Exception 
{
-        testTriggerSavepointCustomFormat("-t", SavepointFormatType.NATIVE);
-    }
-
-    @Test
-    public void testTriggerSavepointCustomFormatLongOption() throws Exception {
-        testTriggerSavepointCustomFormat("--type", SavepointFormatType.NATIVE);
-    }
-
-    private void testTriggerSavepointCustomFormat(String flag, 
SavepointFormatType formatType)
+    @MethodSource("testArguments")

Review comment:
       ```suggestion
   @CsvSource({"-t, NATIVE", "-type, NATIVE"})
   ```
   You can use the STRING representation of an enum in a `@CsvSource`, which is 
nice to bring the parameters closer to where they are used.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -31,47 +31,55 @@
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import java.util.zip.ZipOutputStream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.util.TempDirUtils.newFileIn;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the SAVEPOINT command. */
-public class CliFrontendSavepointTest extends CliFrontendTestBase {
+class CliFrontendSavepointTest extends CliFrontendTestBase {
 
     private static PrintStream stdOut;
     private static PrintStream stdErr;
     private static ByteArrayOutputStream buffer;
 
-    @Rule public TemporaryFolder tmp = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmp;

Review comment:
       This could be an argument to the `testDisposeWithJar` test.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -312,9 +313,11 @@ public void testDisposeSavepointFailure() throws Exception 
{
 
                 fail("Savepoint should have failed.");
             } catch (Exception e) {
-                assertTrue(

Review comment:
       As above, this double-nested try can be written as an 
`assertThatThrownBy`.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
##########
@@ -261,7 +276,8 @@ public void testUnknownJobId() throws Exception {
             testFrontend.stop(parameters);

Review comment:
       This can be rewritten as an `assertThatThrownBy`.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
##########
@@ -22,19 +22,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for the {@link DefaultClusterClientServiceLoader}. */
 public class ClusterClientServiceLoaderTest {

Review comment:
       ```suggestion
   class ClusterClientServiceLoaderTest {
   ```
   There's quite a few remaining, unnecessary `public` in this class.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
##########
@@ -69,28 +64,36 @@ public void testFactoryDiscovery() {
 
         final ClusterClientFactory<Integer> factory =
                 serviceLoaderUnderTest.getClusterClientFactory(config);
-        assertNotNull(factory);
+        assertThat(factory).isNotNull();
 
         final Integer id = factory.getClusterId(config);
-        assertThat(id, allOf(is(notNullValue()), equalTo(VALID_ID)));
+        assertThat(id).isEqualTo(VALID_ID);
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testMoreThanOneCompatibleFactoriesException() {
-        final Configuration config = new Configuration();
-        config.setString(DeploymentOptions.TARGET, AMBIGUOUS_TARGET);
-
-        serviceLoaderUnderTest.getClusterClientFactory(config);
-        fail();
+        assertThatThrownBy(
+                        () -> {
+                            final Configuration config = new Configuration();
+                            config.setString(DeploymentOptions.TARGET, 
AMBIGUOUS_TARGET);
+
+                            
serviceLoaderUnderTest.getClusterClientFactory(config);
+                            fail();

Review comment:
       This line is unnecessary.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
##########
@@ -31,47 +31,55 @@
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import java.util.zip.ZipOutputStream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.util.TempDirUtils.newFileIn;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the SAVEPOINT command. */
-public class CliFrontendSavepointTest extends CliFrontendTestBase {
+class CliFrontendSavepointTest extends CliFrontendTestBase {
 
     private static PrintStream stdOut;
     private static PrintStream stdErr;
     private static ByteArrayOutputStream buffer;
 
-    @Rule public TemporaryFolder tmp = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmp;
+
+    private static Stream<Arguments> testArguments() {
+        return Stream.of(
+                Arguments.of("-t", SavepointFormatType.NATIVE),
+                Arguments.of("--type", SavepointFormatType.NATIVE));
+    }
 
     // ------------------------------------------------------------------------
     // Trigger savepoint
     // ------------------------------------------------------------------------
 
     @Test
-    public void testTriggerSavepointSuccess() throws Exception {
+    void testTriggerSavepointSuccess() throws Exception {
         replaceStdOutAndStdErr();

Review comment:
       Could this pattern of `replaceStdOutAndStdErr` be replace by a utility 
like `CliFrontendTestUtils.pipeSystemOutToNull` and `@BeforeEach`/`@AfterEach`? 
 Just thinking... 

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProvider.java
##########
@@ -44,7 +44,9 @@
 
 /**
  * {@code ClasspathProvider} offers utility methods for creating a classpath 
based on actual jars.
+ * Use {@link ClasspathProviderExtension}.

Review comment:
       ```suggestion
    * @deprecated Use {@link ClasspathProviderExtension}.
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/FromClasspathEntryClassInformationProviderTest.java
##########
@@ -18,161 +18,195 @@
 
 package org.apache.flink.client.deployment.application;
 
-import org.apache.flink.client.testjar.ClasspathProvider;
+import org.apache.flink.client.testjar.ClasspathProviderExtension;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.commons.io.FilenameUtils;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
-import org.hamcrest.core.IsCollectionContaining;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Collection;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * {@code FromClasspathEntryClassInformationProviderTest} tests {@link
  * FromClasspathEntryClassInformationProvider}.
  */
-public class FromClasspathEntryClassInformationProviderTest extends TestLogger 
{
+@ExtendWith(TestLoggerExtension.class)
+class FromClasspathEntryClassInformationProviderTest {
 
-    @Rule
-    public ClasspathProvider noEntryClassClasspathProvider =
-            ClasspathProvider.createWithNoEntryClass();
+    @RegisterExtension
+    ClasspathProviderExtension noEntryClassClasspathProvider =
+            ClasspathProviderExtension.createWithNoEntryClass();
 
-    @Rule
-    public ClasspathProvider singleEntryClassClasspathProvider =
-            ClasspathProvider.createWithSingleEntryClass();
+    @RegisterExtension
+    ClasspathProviderExtension singleEntryClassClasspathProvider =
+            ClasspathProviderExtension.createWithSingleEntryClass();
 
-    @Rule
-    public ClasspathProvider multipleEntryClassesClasspathProvider =
-            ClasspathProvider.createWithMultipleEntryClasses();
+    @RegisterExtension
+    ClasspathProviderExtension multipleEntryClassesClasspathProvider =
+            ClasspathProviderExtension.createWithMultipleEntryClasses();
 
-    @Rule
-    public ClasspathProvider testJobEntryClassClasspathProvider =
-            ClasspathProvider.createWithTestJobOnly();
+    @RegisterExtension
+    ClasspathProviderExtension testJobEntryClassClasspathProvider =
+            ClasspathProviderExtension.createWithTestJobOnly();
 
-    @Rule
-    public ClasspathProvider onlyTextFileClasspathProvider =
-            ClasspathProvider.createWithTextFileOnly();
+    @RegisterExtension
+    ClasspathProviderExtension onlyTextFileClasspathProvider =
+            ClasspathProviderExtension.createWithTextFileOnly();
 
     @Test
-    public void testJobClassOnUserClasspathWithExplicitJobClassName()
-            throws IOException, FlinkException {
+    void testJobClassOnUserClasspathWithExplicitJobClassName() throws 
IOException, FlinkException {
         FromClasspathEntryClassInformationProvider testInstance =
                 FromClasspathEntryClassInformationProvider.create(
                         singleEntryClassClasspathProvider.getJobClassName(),
                         
singleEntryClassClasspathProvider.getURLUserClasspath());
 
-        assertThat(testInstance.getJobClassName().isPresent(), is(true));
-        assertThat(
-                testInstance.getJobClassName().get(),
-                is(singleEntryClassClasspathProvider.getJobClassName()));
-        assertThat(testInstance.getJarFile().isPresent(), is(false));
+        assertThat(testInstance.getJobClassName().isPresent()).isTrue();
+        assertThat(testInstance.getJobClassName().get())
+                
.isEqualTo(singleEntryClassClasspathProvider.getJobClassName());
+        assertThat(testInstance.getJarFile().isPresent()).isFalse();
     }
 
-    @Test(expected = FlinkException.class)
-    public void testJobClassOnUserClasspathWithOnlyTestFileOnClasspath()
+    @Test
+    void testJobClassOnUserClasspathWithOnlyTestFileOnClasspath()
             throws IOException, FlinkException {

Review comment:
       The `assertThatThrownBy`-wrapped tests won't throw Exceptions, so you 
can optionally remove these.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java
##########
@@ -72,25 +71,25 @@ public void testFindEntryClassAssemblerClass() throws 
IOException {
 
         Optional<String> entry = JarManifestParser.findEntryClass(jarFile);
 
-        assertTrue(entry.isPresent());
-        assertThat(entry.get(), is(equalTo("AssemblerClass")));
+        assertThat(entry.isPresent()).isTrue();
+        assertThat(entry.get()).isEqualTo("AssemblerClass");

Review comment:
       ```suggestion
           assertThat(entry).get().isEqualTo("AssemblerClass");
   ```
   There's a nice syntax for testing for nonempty `Optional` -- note that this 
can get a bit fussy when chaining assertions, since AssertJ "forgets" that the 
optional is a String.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java
##########
@@ -101,81 +100,93 @@ public void 
testFindEntryClassAssemblerClassAndMainClass() throws IOException {
 
         Optional<String> entry = JarManifestParser.findEntryClass(jarFile);
 
-        assertTrue(entry.isPresent());
-        assertThat(entry.get(), is(equalTo("AssemblerClass")));
+        assertThat(entry.isPresent()).isTrue();
+        assertThat(entry.get()).isEqualTo("AssemblerClass");
     }
 
     @Test
-    public void testFindEntryClassWithTestJobJar() throws IOException {
+    void testFindEntryClassWithTestJobJar() throws IOException {
         File jarFile = TestJob.getTestJobJar();
 
         Optional<String> entryClass = 
JarManifestParser.findEntryClass(jarFile);
 
-        assertTrue(entryClass.isPresent());
-        assertThat(entryClass.get(), 
is(equalTo(TestJob.class.getCanonicalName())));
+        assertThat(entryClass.isPresent()).isTrue();
+        
assertThat(entryClass.get()).isEqualTo(TestJob.class.getCanonicalName());
     }
 
-    @Test(expected = NoSuchElementException.class)
-    public void testFindOnlyEntryClassEmptyArgument() throws IOException {
-        JarManifestParser.findOnlyEntryClass(Collections.emptyList());
+    @Test
+    void testFindOnlyEntryClassEmptyArgument() throws IOException {

Review comment:
       Doesn't need to thrown an exception for `assertThatThrownBy`.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java
##########
@@ -95,16 +92,16 @@ public void testHappyPath() throws ExecutionException, 
InterruptedException {
 
             result.join();
 
-            assertThat(jobStatusSupplier.getAttemptCounter(), 
is(equalTo(maxAttemptCounter)));
-            assertTrue(result.isDone() && result.get().isSuccess());
+            
assertThat(jobStatusSupplier.getAttemptCounter()).isEqualTo(maxAttemptCounter);
+            assertThat(result.isDone() && result.get().isSuccess()).isTrue();

Review comment:
       ```suggestion
               
assertThat(result).isCompletedWithValueMatching(JobResult::isSuccess);
   ```
   

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
##########
@@ -101,24 +103,24 @@
     private static final String FAIL_MESSAGE =
             "Invalid program should have thrown ProgramInvocationException.";
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {
 
         ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
         env.generateSequence(1, 1000).output(new DiscardingOutputFormat<>());
         plan = env.createProgramPlan();
 
         config = new Configuration();
         config.setString(JobManagerOptions.ADDRESS, "localhost");
-        NetUtils.Port port = NetUtils.getAvailablePort();
+        port = NetUtils.getAvailablePort();

Review comment:
       Nice catch!

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
##########
@@ -142,112 +144,117 @@ private Configuration fromPackagedProgram(
 
     /** Tests that invalid detached mode programs fail. */
     @Test
-    public void testDetachedMode() throws Exception {
+    void testDetachedMode() throws Exception {
         final ClusterClient<?> clusterClient =
                 new MiniClusterClient(new Configuration(), 
MINI_CLUSTER_RESOURCE.getMiniCluster());
 
-        try {
-            PackagedProgram prg =
-                    PackagedProgram.newBuilder()
-                            .setEntryPointClassName(TestEager.class.getName())
-                            .build();
-            final Configuration configuration = fromPackagedProgram(prg, 1, 
true);
-
-            ClientUtils.executeProgram(
-                    new TestExecutorServiceLoader(clusterClient, plan),
-                    configuration,
-                    prg,
-                    false,
-                    false);
-            fail(FAIL_MESSAGE);
-        } catch (ProgramInvocationException e) {
-            assertEquals(
-                    DetachedJobExecutionResult.DETACHED_MESSAGE
-                            + DetachedJobExecutionResult.JOB_RESULT_MESSAGE
-                            + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-                    e.getCause().getMessage());
-        }
-
-        try {
-            PackagedProgram prg =
-                    PackagedProgram.newBuilder()
-                            
.setEntryPointClassName(TestGetRuntime.class.getName())
-                            .build();
-            final Configuration configuration = fromPackagedProgram(prg, 1, 
true);
-
-            ClientUtils.executeProgram(
-                    new TestExecutorServiceLoader(clusterClient, plan),
-                    configuration,
-                    prg,
-                    false,
-                    false);
-            fail(FAIL_MESSAGE);
-        } catch (ProgramInvocationException e) {
-            assertEquals(
-                    DetachedJobExecutionResult.DETACHED_MESSAGE
-                            + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-                    e.getCause().getMessage());
-        }
-
-        try {
-            PackagedProgram prg =
-                    PackagedProgram.newBuilder()
-                            
.setEntryPointClassName(TestGetAccumulator.class.getName())
-                            .build();
-            final Configuration configuration = fromPackagedProgram(prg, 1, 
true);
-
-            ClientUtils.executeProgram(
-                    new TestExecutorServiceLoader(clusterClient, plan),
-                    configuration,
-                    prg,
-                    false,
-                    false);
-            fail(FAIL_MESSAGE);
-        } catch (ProgramInvocationException e) {
-            assertEquals(
-                    DetachedJobExecutionResult.DETACHED_MESSAGE
-                            + DetachedJobExecutionResult.JOB_RESULT_MESSAGE
-                            + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-                    e.getCause().getMessage());
-        }
-
-        try {
-            PackagedProgram prg =
-                    PackagedProgram.newBuilder()
-                            
.setEntryPointClassName(TestGetAllAccumulator.class.getName())
-                            .build();
-            final Configuration configuration = fromPackagedProgram(prg, 1, 
true);
-
-            ClientUtils.executeProgram(
-                    new TestExecutorServiceLoader(clusterClient, plan),
-                    configuration,
-                    prg,
-                    false,
-                    false);
-            fail(FAIL_MESSAGE);
-        } catch (ProgramInvocationException e) {
-            assertEquals(
-                    DetachedJobExecutionResult.DETACHED_MESSAGE
-                            + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-                    e.getCause().getMessage());
-        }
+        assertThatThrownBy(
+                        () -> {
+                            PackagedProgram prg =
+                                    PackagedProgram.newBuilder()
+                                            
.setEntryPointClassName(TestEager.class.getName())
+                                            .build();
+                            final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+
+                            ClientUtils.executeProgram(
+                                    new 
TestExecutorServiceLoader(clusterClient, plan),
+                                    configuration,
+                                    prg,
+                                    false,
+                                    false);
+                            fail(FAIL_MESSAGE);
+                        })
+                .isInstanceOf(ProgramInvocationException.class)
+                .hasMessageContaining(
+                        DetachedJobExecutionResult.DETACHED_MESSAGE
+                                + DetachedJobExecutionResult.JOB_RESULT_MESSAGE
+                                + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE);
+
+        assertThatThrownBy(
+                        () -> {
+                            PackagedProgram prg =
+                                    PackagedProgram.newBuilder()
+                                            
.setEntryPointClassName(TestGetRuntime.class.getName())
+                                            .build();
+                            final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+
+                            ClientUtils.executeProgram(
+                                    new 
TestExecutorServiceLoader(clusterClient, plan),
+                                    configuration,
+                                    prg,
+                                    false,
+                                    false);
+                            fail(FAIL_MESSAGE);
+                        })
+                .isInstanceOf(ProgramInvocationException.class)
+                .hasMessageContaining(
+                        DetachedJobExecutionResult.DETACHED_MESSAGE
+                                + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE);
+
+        assertThatThrownBy(
+                        () -> {
+                            PackagedProgram prg =
+                                    PackagedProgram.newBuilder()
+                                            .setEntryPointClassName(
+                                                    
TestGetAccumulator.class.getName())
+                                            .build();
+                            final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+
+                            ClientUtils.executeProgram(
+                                    new 
TestExecutorServiceLoader(clusterClient, plan),
+                                    configuration,
+                                    prg,
+                                    false,
+                                    false);
+                            fail(FAIL_MESSAGE);
+                        })
+                .isInstanceOf(ProgramInvocationException.class)
+                .hasMessageContaining(
+                        DetachedJobExecutionResult.DETACHED_MESSAGE
+                                + DetachedJobExecutionResult.JOB_RESULT_MESSAGE
+                                + 
DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE);
+
+        assertThatThrownBy(
+                        () -> {
+                            PackagedProgram prg =
+                                    PackagedProgram.newBuilder()
+                                            .setEntryPointClassName(
+                                                    
TestGetAllAccumulator.class.getName())
+                                            .build();
+                            final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+
+                            ClientUtils.executeProgram(
+                                    new 
TestExecutorServiceLoader(clusterClient, plan),
+                                    configuration,
+                                    prg,
+                                    false,
+                                    false);
+                            fail(FAIL_MESSAGE);
+                        })
+                .isInstanceOf(ProgramInvocationException.class)
+                .hasMessageContaining(
+                        DetachedJobExecutionResult.DETACHED_MESSAGE
+                                + 
DetachedJobExecutionResult.JOB_RESULT_MESSAGE);
     }
 
-    @Test(expected = FlinkRuntimeException.class)
-    public void testMultiExecuteWithEnforcingSingleJobExecution() throws 
Throwable {
-        try {
-            launchMultiExecuteJob(true);
-        } catch (Exception e) {
-            if (e instanceof ProgramInvocationException) {
-                throw e.getCause();
-            }
-        }
-        fail("Test should have failed due to multiple execute() calls.");
+    @Test
+    void testMultiExecuteWithEnforcingSingleJobExecution() throws Throwable {
+        assertThatThrownBy(
+                        () -> {
+                            try {
+                                launchMultiExecuteJob(true);
+                            } catch (Exception e) {
+                                if (e instanceof ProgramInvocationException) {
+                                    throw e.getCause();
+                                }
+                            }
+                            fail("Test should have failed due to multiple 
execute() calls.");

Review comment:
       Unnecessary `fail`

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/JarManifestParserTest.java
##########
@@ -101,81 +100,93 @@ public void 
testFindEntryClassAssemblerClassAndMainClass() throws IOException {
 
         Optional<String> entry = JarManifestParser.findEntryClass(jarFile);
 
-        assertTrue(entry.isPresent());
-        assertThat(entry.get(), is(equalTo("AssemblerClass")));
+        assertThat(entry.isPresent()).isTrue();
+        assertThat(entry.get()).isEqualTo("AssemblerClass");
     }
 
     @Test
-    public void testFindEntryClassWithTestJobJar() throws IOException {
+    void testFindEntryClassWithTestJobJar() throws IOException {
         File jarFile = TestJob.getTestJobJar();
 
         Optional<String> entryClass = 
JarManifestParser.findEntryClass(jarFile);
 
-        assertTrue(entryClass.isPresent());
-        assertThat(entryClass.get(), 
is(equalTo(TestJob.class.getCanonicalName())));
+        assertThat(entryClass.isPresent()).isTrue();
+        
assertThat(entryClass.get()).isEqualTo(TestJob.class.getCanonicalName());
     }
 
-    @Test(expected = NoSuchElementException.class)
-    public void testFindOnlyEntryClassEmptyArgument() throws IOException {
-        JarManifestParser.findOnlyEntryClass(Collections.emptyList());
+    @Test
+    void testFindOnlyEntryClassEmptyArgument() throws IOException {
+        assertThatThrownBy(
+                        () -> {
+                            
JarManifestParser.findOnlyEntryClass(Collections.emptyList());
+                        })
+                .isInstanceOf(NoSuchElementException.class);
     }
 
-    @Test(expected = NoSuchElementException.class)
-    public void testFindOnlyEntryClassSingleJarWithNoManifest() throws 
IOException {
-        File jarWithNoManifest = createJarFileWithManifest(ImmutableMap.of());
-        
JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarWithNoManifest));
+    @Test
+    void testFindOnlyEntryClassSingleJarWithNoManifest() throws IOException {
+        assertThatThrownBy(
+                        () -> {
+                            File jarWithNoManifest = 
createJarFileWithManifest(ImmutableMap.of());
+                            JarManifestParser.findOnlyEntryClass(
+                                    ImmutableList.of(jarWithNoManifest));
+                        })
+                .isInstanceOf(NoSuchElementException.class);
     }
 
     @Test
-    public void testFindOnlyEntryClassSingleJar() throws IOException {
+    void testFindOnlyEntryClassSingleJar() throws IOException {
         File jarFile = TestJob.getTestJobJar();
 
         JarManifestParser.JarFileWithEntryClass jarFileWithEntryClass =
                 
JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarFile));
 
-        assertThat(
-                jarFileWithEntryClass.getEntryClass(),
-                is(equalTo(TestJob.class.getCanonicalName())));
+        assertThat(jarFileWithEntryClass.getEntryClass())
+                .isEqualTo(TestJob.class.getCanonicalName());
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void 
testFindOnlyEntryClassMultipleJarsWithMultipleManifestEntries() throws 
IOException {
-        File jarFile = TestJob.getTestJobJar();
-
-        JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarFile, 
jarFile, jarFile));
+    @Test
+    void testFindOnlyEntryClassMultipleJarsWithMultipleManifestEntries() 
throws IOException {
+        assertThatThrownBy(
+                        () -> {
+                            File jarFile = TestJob.getTestJobJar();
+                            JarManifestParser.findOnlyEntryClass(
+                                    ImmutableList.of(jarFile, jarFile, 
jarFile));
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testFindOnlyEntryClassMultipleJarsWithSingleManifestEntry() 
throws IOException {
+    void testFindOnlyEntryClassMultipleJarsWithSingleManifestEntry() throws 
IOException {
         File jarWithNoManifest = createJarFileWithManifest(ImmutableMap.of());
         File jarFile = TestJob.getTestJobJar();
 
         JarManifestParser.JarFileWithEntryClass jarFileWithEntryClass =
                 
JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarWithNoManifest, 
jarFile));
 
-        assertThat(
-                jarFileWithEntryClass.getEntryClass(),
-                is(equalTo(TestJob.class.getCanonicalName())));
+        assertThat(jarFileWithEntryClass.getEntryClass())
+                .isEqualTo(TestJob.class.getCanonicalName());
     }
 
     @Test
-    public void testFindFirstManifestAttributeWithNoAttribute() throws 
IOException {
+    void testFindFirstManifestAttributeWithNoAttribute() throws IOException {
         assertThat(
-                
JarManifestParser.findFirstManifestAttribute(TestJob.getTestJobJar()).isPresent(),
-                is(false));
+                        
JarManifestParser.findFirstManifestAttribute(TestJob.getTestJobJar())
+                                .isPresent())
+                .isFalse();

Review comment:
       ```
           
assertThat(JarManifestParser.findFirstManifestAttribute(TestJob.getTestJobJar())).isEmpty();
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
##########
@@ -275,23 +282,23 @@ private void launchMultiExecuteJob(final boolean 
enforceSingleJobExecution)
 
     /** This test verifies correct job submission messaging logic and plan 
translation calls. */
     @Test
-    public void shouldSubmitToJobClient() throws Exception {
+    void shouldSubmitToJobClient() throws Exception {
         final ClusterClient<?> clusterClient =
                 new MiniClusterClient(new Configuration(), 
MINI_CLUSTER_RESOURCE.getMiniCluster());
         JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, new 
Configuration(), 1);
 
         jobGraph.addJars(Collections.emptyList());
         jobGraph.setClasspaths(Collections.emptyList());
 
-        assertNotNull(clusterClient.submitJob(jobGraph).get());
+        assertThat(clusterClient.submitJob(jobGraph).get()).isNotNull();

Review comment:
       ```suggestion
           assertThat(clusterClient.submitJob(jobGraph))
                   
.succeedsWithin(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue())
                   .isNotNull();
   ```
   You might want to use a success test on the Future with a timeout.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java
##########
@@ -125,8 +122,9 @@ public void testFailedJobResult() throws 
ExecutionException, InterruptedExceptio
 
             result.join();
 
-            assertThat(jobStatusSupplier.getAttemptCounter(), 
is(equalTo(maxAttemptCounter)));
-            assertTrue(result.isDone() && 
result.get().getSerializedThrowable().isPresent());
+            
assertThat(jobStatusSupplier.getAttemptCounter()).isEqualTo(maxAttemptCounter);
+            assertThat(result.isDone() && 
result.get().getSerializedThrowable().isPresent())
+                    .isTrue();

Review comment:
       ```suggestion
               assertThat(result)
                       .isCompletedWithValueMatching(
                               jobResult -> 
jobResult.getSerializedThrowable().isPresent());
   ```
   The other test looks clearer using AssertJ `Future` syntax, but this one is 
a matter of opinion!  I'm not sure if there's a nice way to combine the AssertJ 
`Optional` syntax inside the result matcher here.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
##########
@@ -326,7 +333,7 @@ public Void answer(InvocationOnMock invocation) throws 
Throwable {
     }

Review comment:
       :point_up: There's a missed `assertThatThrownBy` here.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
##########
@@ -89,14 +83,14 @@ public void testDeriveEntryClassInformationForCustomJar()
         final EntryClassInformationProvider informationProvider =
                 
DefaultPackagedProgramRetriever.createEntryClassInformationProvider(
                         null, jarFile, jobClassName, new String[0]);
-        assertThat(informationProvider.getJobClassName().isPresent(), 
is(true));
-        assertThat(informationProvider.getJobClassName().get(), 
is(jobClassName));
-        assertThat(informationProvider.getJarFile().isPresent(), is(true));
-        assertThat(informationProvider.getJarFile().get(), is(jarFile));
+        assertThat(informationProvider.getJobClassName().isPresent()).isTrue();
+        
assertThat(informationProvider.getJobClassName().get()).isEqualTo(jobClassName);
+        assertThat(informationProvider.getJarFile().isPresent()).isTrue();
+        assertThat(informationProvider.getJarFile().get()).isEqualTo(jarFile);

Review comment:
       ```suggestion
           
assertThat(informationProvider.getJobClassName()).get().isEqualTo(jobClassName);
           
assertThat(informationProvider.getJarFile()).get().isEqualTo(jarFile);
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
##########
@@ -377,7 +384,7 @@ public void testFailOnForbiddenConfiguration() throws 
ProgramInvocationException
                                             false))
                     .satisfies(

Review comment:
       This `satisfies` clause can be replaced with:
   ```
                       
.hasRootCauseInstanceOf(MutatedConfigurationException.class);
   ```
   
   (It's not exactly the same thing as `findThrowable`, but when it is, we 
should use it.)

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
##########
@@ -434,63 +442,59 @@ public void 
testRetrieveCorrectUserClasspathsWithPipelineClasspaths() throws Exc
                 DefaultPackagedProgramRetriever.create(
                         null,
                         singleEntryClassClasspathProvider.getJobClassName(),
-                        ClasspathProvider.parametersForTestJob("suffix"),
+                        
ClasspathProviderExtension.parametersForTestJob("suffix"),
                         configuration);
         final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
-        assertThat(jobGraph.getClasspaths(), 
containsInAnyOrder(expectedMergedURLs.toArray()));
+        assertThat(jobGraph.getClasspaths()).isEqualTo(expectedMergedURLs);
     }
 
     @Test
-    public void testRetrieveFromJarFileWithoutUserLib()
+    void testRetrieveFromJarFileWithoutUserLib()
             throws IOException, FlinkException, ProgramInvocationException {
         final PackagedProgramRetriever retrieverUnderTest =
                 DefaultPackagedProgramRetriever.create(
                         null,
                         testJobEntryClassClasspathProvider.getJobJar(),
                         null,
-                        ClasspathProvider.parametersForTestJob("suffix"),
+                        
ClasspathProviderExtension.parametersForTestJob("suffix"),
                         new Configuration());
         final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
 
-        assertThat(
-                jobGraph.getUserJars(),
-                containsInAnyOrder(
+        assertThat(jobGraph.getUserJars())
+                .contains(
                         new org.apache.flink.core.fs.Path(
-                                
testJobEntryClassClasspathProvider.getJobJar().toURI())));
-        assertThat(jobGraph.getClasspaths().isEmpty(), is(true));
+                                
testJobEntryClassClasspathProvider.getJobJar().toURI()));
+        assertThat(jobGraph.getClasspaths().isEmpty()).isTrue();

Review comment:
       ```suggestion
           assertThat(jobGraph.getClasspaths()).isEmpty();
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
##########
@@ -127,17 +123,17 @@ public void testUserClassloaderForConfiguration() throws 
Exception {
         ExecutionConfig executionConfig = 
testParameter.extractExecutionConfig(pipeline);
 
         assertThat(
-                executionConfig
-                        .getDefaultKryoSerializerClasses()
-                        .get(PackagedProgramUtilsPipelineTest.class)
-                        .getName(),
-                is(userSerializerClassName));
+                        executionConfig
+                                .getDefaultKryoSerializerClasses()
+                                .get(PackagedProgramUtilsPipelineTest.class)
+                                .getName())
+                .isEqualTo(userSerializerClassName);
     }
 
     private List<URL> getClassUrls(String className) throws IOException {
         URLClassLoader urlClassLoader =
                 ClassLoaderUtils.compileAndLoadJava(
-                        temporaryFolder.newFolder(),
+                        newFolderIn(temporaryFolder).toFile(),

Review comment:
       ```suggestion
                           temporaryFolder.toFile()
   ```
   It's not necessary to create a new, unique internal folder -- 
`temporaryFolder` will be unique itself every time we arrive here.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
##########
@@ -21,31 +21,34 @@
 import org.apache.flink.client.cli.CliFrontendTestUtils;
 import org.apache.flink.configuration.ConfigConstants;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
 import static 
org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static org.apache.flink.util.TempDirUtils.newFileIn;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link PackagedProgram}. */
-public class PackagedProgramTest {
+class PackagedProgramTest {
 
-    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();

Review comment:
       This can be an annotated argument in the one test that uses it.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
##########
@@ -81,40 +81,42 @@ public void testJobClient() throws Exception {
                         .submitJob(cancellableJobGraph, 
ClassLoader.getSystemClassLoader())
                         .get();
 
-        assertThat(jobClient.getJobID(), is(cancellableJobGraph.getJobID()));
-        assertThat(jobClient.getJobStatus().get(), is(JobStatus.RUNNING));
+        
assertThat(jobClient.getJobID()).isEqualTo(cancellableJobGraph.getJobID());
+        
assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
 
         jobClient.cancel().get();
 
-        assertThrows(
-                "Job was cancelled.",
-                ExecutionException.class,
-                () -> jobClient.getJobExecutionResult().get());
+        assertThatThrownBy(() -> jobClient.getJobExecutionResult().get())
+                .isInstanceOf(ExecutionException.class)
+                .hasMessageContaining("Job was cancelled.");

Review comment:
       ```suggestion
           assertThat(jobClient.getJobExecutionResult())
                   .failsWithin(Duration.ofSeconds(1))
                   .withThrowableOfType(ExecutionException.class)
                   .withMessageContaining("Job was cancelled");
   ```
   Failed `Future` have their own syntax.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
##########
@@ -21,31 +21,34 @@
 import org.apache.flink.client.cli.CliFrontendTestUtils;
 import org.apache.flink.configuration.ConfigConstants;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
 import static 
org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static org.apache.flink.util.TempDirUtils.newFileIn;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link PackagedProgram}. */
-public class PackagedProgramTest {
+class PackagedProgramTest {
 
-    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path temporaryFolder;
 
     @Test
-    public void testExtractContainedLibraries() throws Exception {
+    void testExtractContainedLibraries() throws Exception {
         String s = "testExtractContainedLibraries";
         byte[] nestedJarContent = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
-        File fakeJar = temporaryFolder.newFile("test.jar");
+        Path p = newFileIn(temporaryFolder, "test.jar");
+        File fakeJar = new File(p.toString());

Review comment:
       ```suggestion
           File fakeJar = temporaryFolder.resolve("test.jar").toFile();
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
##########
@@ -20,42 +20,44 @@
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.File;
 import java.net.URISyntaxException;
 
 import static org.apache.flink.client.program.PackagedProgramUtils.resolveURI;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests {@link PackagedProgramUtils}.
  *
  * <p>See also {@link PackagedProgramUtilsPipelineTest} for tests that need to 
test behaviour of
  * {@link DataStream} and {@link DataSet} programs.
  */
-public class PackagedProgramUtilsTest {
+@ExtendWith({TestLoggerExtension.class})
+class PackagedProgramUtilsTest {
 
     @Test
-    public void testResolveURI() throws URISyntaxException {
+    void testResolveURI() throws URISyntaxException {
         final String relativeFile = "path/of/user.jar";
-        assertThat(resolveURI(relativeFile).getScheme(), is("file"));
-        assertThat(
-                resolveURI(relativeFile).getPath(),
-                is(new File(System.getProperty("user.dir"), 
relativeFile).getAbsolutePath()));
+        assertThat(resolveURI(relativeFile).getScheme()).isEqualTo("file");
+        assertThat(resolveURI(relativeFile).getPath())
+                .isEqualTo(
+                        new File(System.getProperty("user.dir"), 
relativeFile).getAbsolutePath());

Review comment:
       ```suggestion
           assertThat(resolveURI(relativeFile))
                   .hasScheme("file")
                   .hasPath(new File(System.getProperty("user.dir"), 
relativeFile).getAbsolutePath());
   ```
   There are URI assertions in AssertJ

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
##########
@@ -81,40 +81,42 @@ public void testJobClient() throws Exception {
                         .submitJob(cancellableJobGraph, 
ClassLoader.getSystemClassLoader())
                         .get();
 
-        assertThat(jobClient.getJobID(), is(cancellableJobGraph.getJobID()));
-        assertThat(jobClient.getJobStatus().get(), is(JobStatus.RUNNING));
+        
assertThat(jobClient.getJobID()).isEqualTo(cancellableJobGraph.getJobID());
+        
assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);

Review comment:
       ```suggestion
                   assertThat(jobClient.getJobStatus())
                   .succeedsWithin(Duration.ofSeconds(1))
                   .isEqualTo(JobStatus.RUNNING);
   ```
   A lot of these `Future` calls can be given max timeouts.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java
##########
@@ -48,11 +47,11 @@ private void runOutputTest(boolean suppressOutput, String[] 
expectedCapturedOutp
             // Flink will throw an error because no job graph will be 
generated by the main method.
             PackagedProgramUtils.getPipelineFromProgram(
                     packagedProgram, new Configuration(), 1, suppressOutput);
-            Assert.fail("This should have failed to create the Flink Plan.");
+            fail("This should have failed to create the Flink Plan.");

Review comment:
       Can be rewritten as a `assertThatThrownBy`?  If it turns out to be less 
clear, let's leave it like this.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
##########
@@ -20,42 +20,44 @@
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.File;
 import java.net.URISyntaxException;
 
 import static org.apache.flink.client.program.PackagedProgramUtils.resolveURI;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests {@link PackagedProgramUtils}.
  *
  * <p>See also {@link PackagedProgramUtilsPipelineTest} for tests that need to 
test behaviour of
  * {@link DataStream} and {@link DataSet} programs.
  */
-public class PackagedProgramUtilsTest {
+@ExtendWith({TestLoggerExtension.class})
+class PackagedProgramUtilsTest {
 
     @Test
-    public void testResolveURI() throws URISyntaxException {
+    void testResolveURI() throws URISyntaxException {
         final String relativeFile = "path/of/user.jar";
-        assertThat(resolveURI(relativeFile).getScheme(), is("file"));
-        assertThat(
-                resolveURI(relativeFile).getPath(),
-                is(new File(System.getProperty("user.dir"), 
relativeFile).getAbsolutePath()));
+        assertThat(resolveURI(relativeFile).getScheme()).isEqualTo("file");
+        assertThat(resolveURI(relativeFile).getPath())
+                .isEqualTo(
+                        new File(System.getProperty("user.dir"), 
relativeFile).getAbsolutePath());
 
         final String absoluteFile = "/path/of/user.jar";
-        assertThat(resolveURI(relativeFile).getScheme(), is("file"));
-        assertThat(resolveURI(absoluteFile).getPath(), is(absoluteFile));
+        assertThat(resolveURI(relativeFile).getScheme()).isEqualTo("file");

Review comment:
       ```suggestion
           
assertThat(resolveURI(absoluteFile)).hasScheme("file").hasPath(absoluteFile);
   ```
   I think this is an actual copy/paste error in the original test; it's 
testing `relativeFile` scheme twice.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
##########
@@ -618,19 +616,22 @@ public void 
testJobSubmissionRespectsConfiguredRetryPolicy() throws Exception {
                     
Objects.requireNonNull(restServerEndpoint.getServerAddress());
             try (RestClusterClient<?> restClusterClient =
                     createRestClusterClient(serverAddress.getPort(), 
clientConfig)) {
-                final ExecutionException exception =
-                        assertThrows(
-                                ExecutionException.class,
-                                () -> 
restClusterClient.submitJob(jobGraph).get());
-                assertThat(
-                        exception, 
FlinkMatchers.containsCause(FutureUtils.RetryException.class));
-                assertEquals(maxRetryAttempts + 1, failedRequest.get());
+                assertThatThrownBy(
+                                () -> {
+                                    
restClusterClient.submitJob(jobGraph).get();
+                                })
+                        .isInstanceOf(ExecutionException.class)
+                        .getCause()
+                        .getCause()
+                        .isInstanceOf(FutureUtils.RetryException.class);
+
+                assertThat(failedRequest.get()).isEqualTo(maxRetryAttempts + 
1);

Review comment:
       ```suggestion
                   assertThat(failedRequest).hasValue(maxRetryAttempts + 1);
   ```
   AtomicInteger assertions!

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
##########
@@ -54,23 +57,24 @@ public void testExtractContainedLibraries() throws 
Exception {
         }
 
         final List<File> files = 
PackagedProgram.extractContainedLibraries(fakeJar.toURI().toURL());
-        Assert.assertEquals(1, files.size());
-        Assert.assertArrayEquals(
-                nestedJarContent, 
Files.readAllBytes(files.iterator().next().toPath()));
+        assertThat(files.size()).isEqualTo(files.size());
+        assertThat(nestedJarContent)
+                
.isEqualTo(Files.readAllBytes(files.iterator().next().toPath()));

Review comment:
               assertThat(files)
                   .hasSize(1)
                   .allSatisfy(
                           f -> 
assertThat(f).content(ConfigConstants.DEFAULT_CHARSET).isEqualTo(s));
   

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
##########
@@ -146,27 +142,26 @@ public void testTriggerSavepointDefaultDirectory() throws 
Exception {
                     restClusterClient
                             .triggerSavepoint(new JobID(), null, 
SavepointFormatType.CANONICAL)
                             .get();
-            assertEquals(expectedReturnedSavepointDir, savepointPath);
+            assertThat(savepointPath).isEqualTo(expectedReturnedSavepointDir);
         }
     }
 
     @Test
-    public void testTriggerSavepointTargetDirectory() throws Exception {
+    void testTriggerSavepointTargetDirectory() throws Exception {
         final TriggerId triggerId = new TriggerId();
         final String expectedSubmittedSavepointDir = "world";
         final String expectedReturnedSavepointDir = "hello";
 
         try (final RestServerEndpoint restServerEndpoint =
                 createRestServerEndpoint(
                         triggerRequestBody -> {
-                            assertEquals(
-                                    expectedSubmittedSavepointDir,
-                                    
triggerRequestBody.getTargetDirectory().get());
-                            assertFalse(triggerRequestBody.isCancelJob());
+                            
assertThat(triggerRequestBody.getTargetDirectory().get())
+                                    .isEqualTo(expectedSubmittedSavepointDir);

Review comment:
       ```suggestion
                               
assertThat(triggerRequestBody.getTargetDirectory())
                                       .get()
                                       
.isEqualTo(expectedSubmittedSavepointDir);
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategyTest.java
##########
@@ -18,78 +18,76 @@
 
 package org.apache.flink.client.program.rest.retry;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for {@link ExponentialWaitStrategy}. */
-public class ExponentialWaitStrategyTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class ExponentialWaitStrategyTest {
 
     @Test
-    public void testNegativeInitialWait() {
+    void testNegativeInitialWait() {
         try {

Review comment:
       These should be rewritten as `assertThatThrowsBy` tests

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
##########
@@ -65,17 +64,17 @@ public void testGetExecutionPlan() {
                     new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), config);
             Plan plan = (Plan) 
PackagedProgramUtils.getPipelineFromProgram(prg, config, -1, true);
             OptimizedPlan op = optimizer.compile(plan);
-            assertNotNull(op);
+            assertThat(op).isNotNull();
 
             PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
-            assertNotNull(dumper.getOptimizerPlanAsJSON(op));
+            assertThat(dumper.getOptimizerPlanAsJSON(op)).isNotNull();
 
             // test HTML escaping
             PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
             dumper2.setEncodeForHTML(true);
             String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
 
-            assertEquals(-1, htmlEscaped.indexOf('\\'));
+            assertThat(htmlEscaped.indexOf('\\')).isEqualTo(-1);

Review comment:
       ```suggestion
               assertThat(htmlEscaped).doesNotContain("\\");
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
##########
@@ -481,18 +479,18 @@ public void testGetAccumulators() throws Exception {
                     
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
                 JobID id = new JobID();
                 Map<String, Object> accumulators = 
restClusterClient.getAccumulators(id).get();
-                assertNotNull(accumulators);
-                assertEquals(1, accumulators.size());
+                assertThat(accumulators).isNotNull();
+                assertThat(accumulators.size()).isEqualTo(1);

Review comment:
       ```suggestion
                   assertThat(accumulators).hasSize(1).containsEntry("testKey", 
"testValue");
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
##########
@@ -107,35 +103,35 @@
         REST_CONFIG = new UnmodifiableConfiguration(config);
     }
 
-    @BeforeClass
-    public static void setUp() throws ConfigurationException {
+    @BeforeAll
+    static void setUp() throws ConfigurationException {
         executor =
                 Executors.newSingleThreadExecutor(
                         new ExecutorThreadFactory(
                                 
RestClusterClientSavepointTriggerTest.class.getSimpleName()));
     }
 
-    @AfterClass
-    public static void tearDown() {
+    @AfterAll
+    static void tearDown() {
         if (executor != null) {
             executor.shutdown();
         }
     }
 
     @Test
-    public void testTriggerSavepointDefaultDirectory() throws Exception {
+    void testTriggerSavepointDefaultDirectory() throws Exception {
         final TriggerId triggerId = new TriggerId();
         final String expectedReturnedSavepointDir = "hello";
 
         try (final RestServerEndpoint restServerEndpoint =
                 createRestServerEndpoint(
                         request -> {
-                            
assertThat(request.getTargetDirectory().isPresent(), is(false));
-                            assertFalse(request.isCancelJob());
+                            
assertThat(request.getTargetDirectory().isPresent()).isFalse();

Review comment:
       ```suggestion
                               
assertThat(request.getTargetDirectory()).isEmpty();
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
##########
@@ -698,13 +699,12 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
                                 
.thenCompose(restClusterClient::requestJobResult)
                                 .get()
                                 
.toJobExecutionResult(ClassLoader.getSystemClassLoader());
-                assertTrue(firstExecutionResultPollFailed.get());
-                assertTrue(firstSubmitRequestFailed.get());
-                assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
-                assertThat(jobExecutionResult.getNetRuntime(), 
equalTo(Long.MAX_VALUE));
-                assertThat(
-                        jobExecutionResult.getAllAccumulatorResults(),
-                        equalTo(Collections.singletonMap("testName", 1.0)));
+                assertThat(firstExecutionResultPollFailed.get()).isTrue();
+                assertThat(firstSubmitRequestFailed.get()).isTrue();
+                assertThat(jobExecutionResult.getJobID()).isEqualTo(jobId);
+                
assertThat(jobExecutionResult.getNetRuntime()).isEqualTo(Long.MAX_VALUE);
+                assertThat(jobExecutionResult.getAllAccumulatorResults())
+                        .isEqualTo(Collections.singletonMap("testName", 1.0));

Review comment:
       ```suggestion
                   assertThat(jobExecutionResult.getAllAccumulatorResults())
                           .hasSize(1)
                           .containsEntry("testName", 1.0);
   ```

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
##########
@@ -81,40 +81,42 @@ public void testJobClient() throws Exception {
                         .submitJob(cancellableJobGraph, 
ClassLoader.getSystemClassLoader())
                         .get();
 
-        assertThat(jobClient.getJobID(), is(cancellableJobGraph.getJobID()));
-        assertThat(jobClient.getJobStatus().get(), is(JobStatus.RUNNING));
+        
assertThat(jobClient.getJobID()).isEqualTo(cancellableJobGraph.getJobID());
+        
assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
 
         jobClient.cancel().get();
 
-        assertThrows(
-                "Job was cancelled.",
-                ExecutionException.class,
-                () -> jobClient.getJobExecutionResult().get());
+        assertThatThrownBy(() -> jobClient.getJobExecutionResult().get())
+                .isInstanceOf(ExecutionException.class)
+                .hasMessageContaining("Job was cancelled.");
 
         assertThatMiniClusterIsShutdown();
     }
 
     @Test
-    public void testJobClientSavepoint() throws Exception {
+    void testJobClientSavepoint() throws Exception {
         PerJobMiniClusterFactory perJobMiniClusterFactory = 
initializeMiniCluster();
         JobClient jobClient =
                 perJobMiniClusterFactory
                         .submitJob(getCancellableJobGraph(), 
ClassLoader.getSystemClassLoader())
                         .get();
 
-        assertThrows(
-                "is not a streaming job.",
-                ExecutionException.class,
-                () -> jobClient.triggerSavepoint(null, 
SavepointFormatType.DEFAULT).get());
-
-        assertThrows(
-                "is not a streaming job.",
-                ExecutionException.class,
-                () -> jobClient.stopWithSavepoint(true, null, 
SavepointFormatType.DEFAULT).get());
+        assertThatThrownBy(

Review comment:
       Idem, `failsWithin` is the right choice for testing failed futures.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
##########
@@ -698,13 +699,12 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
                                 
.thenCompose(restClusterClient::requestJobResult)
                                 .get()
                                 
.toJobExecutionResult(ClassLoader.getSystemClassLoader());
-                assertTrue(firstExecutionResultPollFailed.get());
-                assertTrue(firstSubmitRequestFailed.get());
-                assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
-                assertThat(jobExecutionResult.getNetRuntime(), 
equalTo(Long.MAX_VALUE));
-                assertThat(
-                        jobExecutionResult.getAllAccumulatorResults(),
-                        equalTo(Collections.singletonMap("testName", 1.0)));
+                assertThat(firstExecutionResultPollFailed.get()).isTrue();
+                assertThat(firstSubmitRequestFailed.get()).isTrue();

Review comment:
       ```suggestion
                   assertThat(firstExecutionResultPollFailed)).isTrue();
                   assertThat(firstSubmitRequestFailed).isTrue();
   ```
   AtomicBoolean asserts don't need a get

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
##########
@@ -526,13 +524,13 @@ public void testRESTManualConfigurationOverride() throws 
Exception {
                                 .getClusterClient();
 
         URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
-        assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname));
-        assertThat(webMonitorBaseUrl.getPort(), equalTo(manualPort));
+        assertThat(webMonitorBaseUrl.getHost()).isEqualTo(manualHostname);
+        assertThat(webMonitorBaseUrl.getPort()).isEqualTo(manualPort);

Review comment:
       ```suggestion
           
assertThat(webMonitorBaseUrl).hasHost(manualHostname).hasPort(manualPort);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to