This is an automated email from the ASF dual-hosted git repository.

RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b43367edfb [FLINK-39527][JUnit5 Migration][e2e] Migrate 
flink-end-to-end-tests to JUnit 5 (#28040)
8b43367edfb is described below

commit 8b43367edfbbf1e7b8ef00fb719d0079e78bb557
Author: Mate Czagany <[email protected]>
AuthorDate: Mon May 11 04:07:30 2026 +0200

    [FLINK-39527][JUnit5 Migration][e2e] Migrate flink-end-to-end-tests to 
JUnit 5 (#28040)
    
    Co-authored-by: Yuepeng Pan <[email protected]>
    Co-authored-by: davidradl <[email protected]>
---
 .../org/apache/flink/sql/tests/BatchSQLTest.java   |  8 +-
 .../streaming/tests/AllroundMiniClusterTest.java   | 41 ++++-----
 .../flink/tests/util/cache/DownloadCache.java      | 14 +++-
 .../tests/util/cache/DownloadCacheExtension.java   | 58 +++++++++++++
 .../flink/tests/util/flink/FlinkResource.java      | 14 +++-
 .../tests/util/flink/FlinkResourceExtension.java   | 51 +++++++++++
 .../org/apache/flink/tests/util/TestUtilsTest.java | 28 ++++---
 .../flink/tests/util/util/FileUtilsTest.java       | 46 +++++-----
 .../flink/runtime/rest/RestClientITCase.java       | 10 ++-
 .../apache/flink/tests/scala/ScalaFreeITCase.java  | 52 +++++-------
 .../sql/CompileAndExecuteRemotePlanITCase.java     | 20 ++---
 .../flink/table/sql/CreateTableAsITCase.java       | 20 ++---
 .../org/apache/flink/table/sql/HdfsITCaseBase.java | 51 ++++++-----
 .../flink/table/sql/PlannerScalaFreeITCase.java    | 24 +++---
 .../org/apache/flink/table/sql/SqlITCaseBase.java  | 98 ++++++++++------------
 .../flink/table/sql/UsingRemoteJarITCase.java      | 55 ++++++------
 .../table/test/async/AsyncScalarFunctionTest.java  |  4 +-
 .../metrics/tests/MetricsAvailabilityITCase.java   | 36 ++++----
 .../tests/PrometheusReporterEndToEndITCase.java    | 93 ++++++++++----------
 .../src/test/java/SqlClientITCase.java             |  8 +-
 .../flink/table/gateway/SqlGatewayE2ECase.java     | 90 ++++++++++----------
 .../table/gateway/containers/HiveContainer.java    |  9 +-
 .../org/apache/flink/util/ExternalResource.java    | 61 --------------
 23 files changed, 477 insertions(+), 414 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
 
b/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
index 007bef8abd6..0c2ce4c4294 100644
--- 
a/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
+++ 
b/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
@@ -51,7 +51,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 @ExtendWith(TestLoggerExtension.class)
 class BatchSQLTest {
@@ -79,7 +79,7 @@ class BatchSQLTest {
                 // Only above shuffle modes are supported by the adaptive 
batch scheduler
                 // , "ALL_EXCHANGES_PIPELINE"
             })
-    public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path 
tmpDir) throws Exception {
+    void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) 
throws Exception {
         LOG.info("Results for this test will be stored at: {}", tmpDir);
 
         String sqlStatement = new String(Files.readAllBytes(sqlPath));
@@ -143,13 +143,13 @@ class BatchSQLTest {
                         .filter(file -> !file.isDirectory())
                         .map(File::getPath)
                         .collect(Collectors.toList());
-        assertEquals(1, files.size());
+        assertThat(files).hasSize(1);
         Path resultFile = Paths.get(files.get(0));
 
         LOG.info("Result found at {}", resultFile);
         String actual = new String(Files.readAllBytes(resultFile));
         LOG.info("Actual result is: '{}'", actual);
 
-        assertEquals(expected, actual);
+        assertThat(actual).isEqualTo(expected);
     }
 }
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
index eabe48b0639..c8ede888852 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
@@ -21,39 +21,41 @@ package org.apache.flink.streaming.tests;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
+import java.nio.file.Path;
 
 import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
 
 /** DataStreamAllroundTestProgram on MiniCluster for manual debugging 
purposes. */
-@Ignore("Test is already part of end-to-end tests. This is for manual 
debugging.")
-public class AllroundMiniClusterTest extends TestLogger {
+@Disabled("Test is already part of end-to-end tests. This is for manual 
debugging.")
+@ExtendWith(TestLoggerExtension.class)
+class AllroundMiniClusterTest {
 
-    @BeforeClass
-    public static void beforeClass() {
+    @BeforeAll
+    static void beforeClass() {
         org.apache.log4j.PropertyConfigurator.configure(
                 
AllroundMiniClusterTest.class.getClassLoader().getResource("log4j.properties"));
     }
 
-    @ClassRule
-    public static MiniClusterWithClientResource miniClusterResource =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(4)
                             .setNumberSlotsPerTaskManager(2)
                             .setConfiguration(createConfiguration())
                             .build());
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+    @TempDir private static Path temporaryFolder;
 
     private static Configuration createConfiguration() {
         Configuration configuration = new Configuration();
@@ -63,12 +65,13 @@ public class AllroundMiniClusterTest extends TestLogger {
     }
 
     @Test
-    public void runTest() throws Exception {
-        File checkpointDir = temporaryFolder.newFolder();
+    void runTest() throws Exception {
+        Path checkpointDir = temporaryFolder.resolve("checkpoints");
+        java.nio.file.Files.createDirectories(checkpointDir);
         DataStreamAllroundTestProgram.main(
                 new String[] {
                     "--environment.parallelism", "8",
-                    "--state_backend.checkpoint_directory", 
checkpointDir.toURI().toString(),
+                    "--state_backend.checkpoint_directory", 
checkpointDir.toUri().toString(),
                     "--state_backend", "rocks",
                     "--state_backend.rocks.incremental", "true",
                     "--test.simulate_failure", "true",
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java
index 687f5144a24..3fddc0fc187 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java
@@ -19,7 +19,6 @@
 package org.apache.flink.tests.util.cache;
 
 import org.apache.flink.tests.util.util.FactoryUtils;
-import org.apache.flink.util.ExternalResource;
 
 import java.io.IOException;
 import java.nio.file.Path;
@@ -30,7 +29,18 @@ import java.nio.file.Path;
  *
  * <p>Whether, how, and for how long files are cached is 
implementation-dependent.
  */
-public interface DownloadCache extends ExternalResource {
+interface DownloadCache {
+
+    /** Initializes the cache before a test starts. */
+    void before() throws Exception;
+
+    /** Cleans up cache resources after a test completed successfully. */
+    void afterTestSuccess();
+
+    /** Cleans up cache resources after a test failed. */
+    default void afterTestFailure() {
+        afterTestSuccess();
+    }
 
     /**
      * Returns either a cached or newly downloaded version of the given file. 
The returned file path
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java
new file mode 100644
index 00000000000..024f2503331
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.cache;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/** JUnit 5 extension that wraps a {@link DownloadCache} and manages its 
lifecycle. */
+public class DownloadCacheExtension implements BeforeEachCallback, 
AfterEachCallback {
+
+    private final DownloadCache delegate;
+
+    public DownloadCacheExtension() {
+        this(DownloadCache.get());
+    }
+
+    DownloadCacheExtension(DownloadCache delegate) {
+        this.delegate = delegate;
+    }
+
+    public Path getOrDownload(String url, Path targetDir) throws IOException {
+        return delegate.getOrDownload(url, targetDir);
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        delegate.before();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) {
+        if (context.getExecutionException().isPresent()) {
+            delegate.afterTestFailure();
+        } else {
+            delegate.afterTestSuccess();
+        }
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
index 397d7428823..f72a779ab4c 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
@@ -20,7 +20,6 @@ package org.apache.flink.tests.util.flink;
 
 import org.apache.flink.test.util.JobSubmission;
 import org.apache.flink.tests.util.util.FactoryUtils;
-import org.apache.flink.util.ExternalResource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -30,7 +29,18 @@ import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 /** Generic interface for interacting with Flink. */
-public interface FlinkResource extends ExternalResource {
+public interface FlinkResource {
+
+    /** Initializes the resource before a test starts. */
+    void before() throws Exception;
+
+    /** Cleans up the resource after a test completed successfully. */
+    void afterTestSuccess();
+
+    /** Cleans up the resource after a test failed. */
+    default void afterTestFailure() {
+        afterTestSuccess();
+    }
 
     /**
      * Starts a cluster.
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java
new file mode 100644
index 00000000000..349a67f783f
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** JUnit 5 extension that wraps a {@link FlinkResource} and manages its 
lifecycle. */
+public class FlinkResourceExtension implements BeforeEachCallback, 
AfterEachCallback {
+
+    private final FlinkResource delegate;
+
+    public FlinkResourceExtension(FlinkResource delegate) {
+        this.delegate = delegate;
+    }
+
+    public FlinkResource getFlinkResource() {
+        return delegate;
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        delegate.before();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) {
+        if (context.getExecutionException().isPresent()) {
+            delegate.afterTestFailure();
+        } else {
+            delegate.afterTestSuccess();
+        }
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java
index 717f5103d57..05a6b907ca0 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java
@@ -19,38 +19,40 @@ package org.apache.flink.tests.util;
 
 import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
 import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Tests for {@link TestUtils}. */
-public class TestUtilsTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class TestUtilsTest {
 
-    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private Path temporaryFolder;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupClass() {
         OperatingSystemRestriction.forbid(
                 "Symbolic links usually require special permissions on 
Windows.",
                 OperatingSystem.WINDOWS);
     }
 
     @Test
-    public void copyDirectory() throws IOException {
+    void copyDirectory() throws IOException {
         Path[] files = {
             Paths.get("file1"), Paths.get("dir1", "file2"),
         };
 
-        Path source = temporaryFolder.newFolder("source").toPath();
+        Path source = Files.createDirectory(temporaryFolder.resolve("source"));
         for (Path file : files) {
             Files.createDirectories(source.resolve(file).getParent());
             Files.createFile(source.resolve(file));
@@ -63,7 +65,7 @@ public class TestUtilsTest extends TestLogger {
         TestUtils.copyDirectory(symbolicLink, target);
 
         for (Path file : files) {
-            Assert.assertTrue(Files.exists(target.resolve(file)));
+            assertThat(target.resolve(file)).exists();
         }
     }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
index d376432127e..f1d960cb60e 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
@@ -18,34 +18,32 @@
 package org.apache.flink.tests.util.util;
 
 import org.apache.flink.test.util.FileUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Tests for {@link FileUtils}. */
-public class FileUtilsTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class FileUtilsTest {
 
-    @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder();
+    private static final List<String> ORIGINAL_LINES = List.of("line1", 
"line2", "line3");
 
-    private static final List<String> ORIGINAL_LINES =
-            Collections.unmodifiableList(Arrays.asList("line1", "line2", 
"line3"));
     private Path testFile;
 
-    @Before
-    public void setupFile() throws IOException {
-        Path path = TMP.newFile().toPath();
+    @BeforeEach
+    void setupFile(@TempDir Path tmpDir) throws IOException {
+        Path path = Files.createTempFile(tmpDir, null, null);
 
         Files.write(path, ORIGINAL_LINES);
 
@@ -53,27 +51,25 @@ public class FileUtilsTest extends TestLogger {
     }
 
     @Test
-    public void replaceSingleMatch() throws IOException {
+    void replaceSingleMatch() throws IOException {
         FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> 
"removed");
 
-        Assert.assertEquals(
-                Arrays.asList("removed", ORIGINAL_LINES.get(1), 
ORIGINAL_LINES.get(2)),
-                Files.readAllLines(testFile));
+        assertThat(Files.readAllLines(testFile))
+                .containsExactly("removed", ORIGINAL_LINES.get(1), 
ORIGINAL_LINES.get(2));
     }
 
     @Test
-    public void replaceMultipleMatch() throws IOException {
+    void replaceMultipleMatch() throws IOException {
         FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> 
matcher.group(1));
 
-        Assert.assertEquals(Arrays.asList("1", "2", "3"), 
Files.readAllLines(testFile));
+        assertThat(Files.readAllLines(testFile)).containsExactly("1", "2", 
"3");
     }
 
     @Test
-    public void replaceWithEmptyLine() throws IOException {
+    void replaceWithEmptyLine() throws IOException {
         FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> "");
 
-        Assert.assertEquals(
-                Arrays.asList(ORIGINAL_LINES.get(0), "", 
ORIGINAL_LINES.get(2)),
-                Files.readAllLines(testFile));
+        assertThat(Files.readAllLines(testFile))
+                .containsExactly(ORIGINAL_LINES.get(0), "", 
ORIGINAL_LINES.get(2));
     }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
index 57cbac116d1..fb3c77339e9 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
@@ -24,12 +24,13 @@ import 
org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.Executors;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.net.URL;
 import java.util.Collection;
@@ -38,10 +39,11 @@ import java.util.LinkedHashMap;
 import java.util.concurrent.TimeUnit;
 
 /** Tests for {@link RestClient} that rely on external connections. */
-public class RestClientITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class RestClientITCase {
 
     @Test
-    public void testHttpsConnectionWithDefaultCerts() throws Exception {
+    void testHttpsConnectionWithDefaultCerts() throws Exception {
         final Configuration config = new Configuration();
         final URL httpsUrl = new URL("https://raw.githubusercontent.com";);
         TestUrlMessageHeaders testUrlMessageHeaders =
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
index e0ad66e0af8..f3ce565b2ba 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
@@ -21,15 +21,17 @@ import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.test.util.JobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.JarLocation;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.nio.file.Path;
 import java.time.Duration;
@@ -42,19 +44,19 @@ import java.util.function.Consumer;
  * Tests that Flink does not require Scala for jobs that do not use the Scala 
APIs. This covers both
  * pure Java jobs, and Scala jobs that use the Java APIs exclusively with 
Scala types.
  */
-@RunWith(Parameterized.class)
-public class ScalaFreeITCase extends TestLogger {
+@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class})
+class ScalaFreeITCase {
 
-    @Rule
-    public final TestExecutorResource<ScheduledExecutorService> 
testExecutorResource =
-            new TestExecutorResource<>(
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
TEST_EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(
                     
java.util.concurrent.Executors::newSingleThreadScheduledExecutor);
 
-    @Rule public final FlinkResource flink;
+    @RegisterExtension private final FlinkResourceExtension flinkExtension;
     private final String mainClass;
 
-    @Parameterized.Parameters(name = "{index}: {0}")
-    public static Collection<TestParams> testParameters() {
+    @Parameters(name = "{0}")
+    private static Collection<TestParams> testParameters() {
         return Arrays.asList(
                 new TestParams("Java job, without Scala in lib/", 
JavaJob.class.getCanonicalName()),
                 new TestParams(
@@ -69,21 +71,21 @@ public class ScalaFreeITCase extends TestLogger {
                                         JarLocation.LIB)));
     }
 
-    public ScalaFreeITCase(TestParams testParams) {
+    private ScalaFreeITCase(TestParams testParams) {
         final FlinkResourceSetup.FlinkResourceSetupBuilder builder =
                 FlinkResourceSetup.builder()
                         .moveJar("flink-scala", JarLocation.LIB, 
JarLocation.OPT);
         testParams.builderSetup.accept(builder);
-        flink = FlinkResource.get(builder.build());
+        flinkExtension = new 
FlinkResourceExtension(FlinkResource.get(builder.build()));
         mainClass = testParams.mainClass;
     }
 
-    @Test
-    public void testScalaFreeJobExecution() throws Exception {
+    @TestTemplate
+    void testScalaFreeJobExecution() throws Exception {
         final Path jobJar = ResourceTestUtils.getResource("/jobs.jar");
 
-        try (final ClusterController clusterController = 
flink.startCluster(1)) {
-            // if the job fails then this throws an exception
+        try (final ClusterController clusterController =
+                flinkExtension.getFlinkResource().startCluster(1)) {
             clusterController.submitJob(
                     new JobSubmission.JobSubmissionBuilder(jobJar)
                             .setDetached(false)
@@ -93,7 +95,7 @@ public class ScalaFreeITCase extends TestLogger {
         }
     }
 
-    static class TestParams {
+    private static class TestParams {
 
         private final String description;
         private final String mainClass;
@@ -112,14 +114,6 @@ public class ScalaFreeITCase extends TestLogger {
             this.builderSetup = builderSetup;
         }
 
-        public String getMainClass() {
-            return mainClass;
-        }
-
-        public Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> 
getBuilderSetup() {
-            return builderSetup;
-        }
-
         @Override
         public String toString() {
             return description;
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java
index 439770cce24..5ad87f0f87c 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java
@@ -20,8 +20,7 @@ package org.apache.flink.table.sql;
 
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.junit.Assume;
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
 
 import java.io.IOException;
 import java.nio.file.Path;
@@ -32,27 +31,24 @@ import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** End-to-End tests for COMPILE AND EXECUTE PLAN statement with hdfs as 
remote uri. */
-public class CompileAndExecuteRemotePlanITCase extends HdfsITCaseBase {
+class CompileAndExecuteRemotePlanITCase extends HdfsITCaseBase {
 
     private static final String TABLE1 = "message";
     private static final String TABLE2 = "employee";
 
     private String planDir;
 
-    public CompileAndExecuteRemotePlanITCase(String executionMode) {
-        super(executionMode);
-    }
-
     @Override
-    protected void createHDFS() {
+    void createHDFS() {
         super.createHDFS();
         planDir = getRemotePlanDir();
     }
 
     @Override
-    protected Map<String, String> generateReplaceVars() {
+    Map<String, String> generateReplaceVars() {
         Map<String, String> varsMap = super.generateReplaceVars();
         varsMap.put("$REMOTE_PLAN_DIR", planDir);
         varsMap.put("$TABLE1", TABLE1);
@@ -60,10 +56,10 @@ public class CompileAndExecuteRemotePlanITCase extends 
HdfsITCaseBase {
         return varsMap;
     }
 
-    @Test
-    public void testCompileAndExecutePlan() throws Exception {
+    @TestTemplate
+    void testCompileAndExecutePlan() throws Exception {
         // COMPILE AND EXECUTE PLAN is not supported under batch mode
-        Assume.assumeTrue(executionMode.equals("streaming"));
+        assumeThat(executionMode).isEqualTo("streaming");
         Map<Path, List<String>> resultItems = new HashMap<>();
         resultItems.put(result.resolve(TABLE1), Arrays.asList("1,Meow", 
"2,Purr"));
         resultItems.put(result.resolve(TABLE2), Arrays.asList("1,Tom", 
"2,Jerry"));
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
index 06fbe73dd66..85a333a8e94 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
 
 import java.net.URI;
 import java.time.Duration;
@@ -37,7 +37,7 @@ import java.util.Collections;
 import java.util.List;
 
 /** End-to-End tests for create table as select syntax. */
-public class CreateTableAsITCase extends SqlITCaseBase {
+class CreateTableAsITCase extends SqlITCaseBase {
 
     private static final ResolvedSchema SINK_TABLE_SCHEMA =
             new ResolvedSchema(
@@ -54,29 +54,25 @@ public class CreateTableAsITCase extends SqlITCaseBase {
     private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
             createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
 
-    public CreateTableAsITCase(String executionMode) {
-        super(executionMode);
-    }
-
-    @Test
-    public void testCreateTableAs() throws Exception {
+    @TestTemplate
+    void testCreateTableAs() throws Exception {
         runAndCheckSQL("create_table_as_e2e.sql", Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]"));
     }
 
-    @Test
-    public void testCreateTableAsInStatementSet() throws Exception {
+    @TestTemplate
+    void testCreateTableAsInStatementSet() throws Exception {
         runAndCheckSQL(
                 "create_table_as_statementset_e2e.sql",
                 Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
     }
 
     @Override
-    protected List<String> formatRawResult(List<String> rawResult) {
+    List<String> formatRawResult(List<String> rawResult) {
         return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, 
DESERIALIZATION_SCHEMA);
     }
 
     @Override
-    protected void executeSqlStatements(
+    void executeSqlStatements(
             ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
             throws Exception {
         clusterController.submitSQLJob(
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
index 8aae8efcc71..0342c6bd517 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
@@ -27,11 +27,9 @@ import org.apache.flink.util.OperatingSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -41,38 +39,37 @@ import java.nio.file.Path;
 import java.time.Duration;
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.fail;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
 /** Base class for sql ITCase which depends on HDFS env. */
-public abstract class HdfsITCaseBase extends SqlITCaseBase {
+abstract class HdfsITCaseBase extends SqlITCaseBase {
 
     private static final Path HADOOP_CLASSPATH =
             ResourceTestUtils.getResource(".*hadoop.classpath");
 
-    protected Configuration hdConf;
-    protected MiniDFSCluster hdfsCluster;
-
-    public HdfsITCaseBase(String executionMode) {
-        super(executionMode);
-    }
+    Configuration hdConf;
+    MiniDFSCluster hdfsCluster;
 
-    @BeforeClass
-    public static void verifyOS() {
-        Assume.assumeTrue(
-                "HDFS cluster cannot be started on Windows without 
extensions.",
-                !OperatingSystem.isWindows());
+    @BeforeAll
+    static void verifyOS() {
+        assumeThat(OperatingSystem.isWindows())
+                .as("HDFS cluster cannot be started on Windows without 
extensions.")
+                .isFalse();
     }
 
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    void before() throws Exception {
         super.before();
         createHDFS();
     }
 
-    @After
-    public void after() {
+    @AfterEach
+    void after() {
         destroyHDFS();
     }
 
-    protected void createHDFS() {
+    void createHDFS() {
         try {
             hdConf = new Configuration();
             File baseDir =
@@ -83,16 +80,16 @@ public abstract class HdfsITCaseBase extends SqlITCaseBase {
             MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
             hdfsCluster = builder.build();
         } catch (Throwable e) {
-            Assert.fail("Failed to create HDFS env" + e.getMessage());
+            fail("Failed to create HDFS env" + e.getMessage(), e);
         }
     }
 
-    protected void destroyHDFS() {
+    void destroyHDFS() {
         hdfsCluster.shutdown();
     }
 
     @Override
-    protected void executeSqlStatements(
+    void executeSqlStatements(
             ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
             throws Exception {
         clusterController.submitSQLJob(
@@ -116,7 +113,7 @@ public abstract class HdfsITCaseBase extends SqlITCaseBase {
                                 "File that contains hadoop classpath %s does 
not exist.",
                                 HADOOP_CLASSPATH));
             } catch (FileNotFoundException e) {
-                Assert.fail("Test failed " + e.getMessage());
+                fail("Test failed " + e.getMessage(), e);
             }
         }
 
@@ -124,7 +121,7 @@ public abstract class HdfsITCaseBase extends SqlITCaseBase {
         try {
             classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
         } catch (IOException e) {
-            Assert.fail("Test failed " + e.getMessage());
+            fail("Test failed " + e.getMessage(), e);
         }
         return classPathContent;
     }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
index dbdbae5b551..51a0f278cb3 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
 
 import java.net.URI;
 import java.time.Duration;
@@ -36,7 +36,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.junit.Assume.assumeTrue;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /**
  * End-to-End tests for table planner scala-free since 1.15. Due to scala-free 
of table planner
@@ -44,7 +44,7 @@ import static org.junit.Assume.assumeTrue;
  * class in execution time, ClassNotFound exception will be thrown. ITCase in 
table planner can not
  * cover it, so we should add E2E test for these case.
  */
-public class PlannerScalaFreeITCase extends SqlITCaseBase {
+class PlannerScalaFreeITCase extends SqlITCaseBase {
 
     private static final ResolvedSchema SINK_TABLE_SCHEMA =
             new ResolvedSchema(
@@ -61,29 +61,25 @@ public class PlannerScalaFreeITCase extends SqlITCaseBase {
     private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
             createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
 
-    public PlannerScalaFreeITCase(String executionMode) {
-        super(executionMode);
-    }
-
-    @Test
-    public void testImperativeUdaf() throws Exception {
+    @TestTemplate
+    void testImperativeUdaf() throws Exception {
         runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]"));
     }
 
     /** The test data is from {@link 
org.apache.flink.table.toolbox.TestSourceFunction#DATA}. */
-    @Test
-    public void testWatermarkPushDown() throws Exception {
-        assumeTrue(executionMode.equalsIgnoreCase("streaming"));
+    @TestTemplate
+    void testWatermarkPushDown() throws Exception {
+        assumeThat(executionMode).isEqualTo("streaming");
         runAndCheckSQL("watermark_push_down_e2e.sql", Arrays.asList("+I[Bob, 
1]", "+I[Alice, 2]"));
     }
 
     @Override
-    protected List<String> formatRawResult(List<String> rawResult) {
+    List<String> formatRawResult(List<String> rawResult) {
         return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, 
DESERIALIZATION_SCHEMA);
     }
 
     @Override
-    protected void executeSqlStatements(
+    void executeSqlStatements(
             ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
             throws Exception {
         clusterController.submitSQLJob(
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
index 41d1f55a1b2..31ac671344b 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
@@ -30,19 +30,21 @@ import 
org.apache.flink.table.data.conversion.RowRowConverter;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,38 +69,33 @@ import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Base class for sql ITCase. */
-@RunWith(Parameterized.class)
-public abstract class SqlITCaseBase extends TestLogger {
+@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class})
+abstract class SqlITCaseBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(SqlITCaseBase.class);
 
-    @Parameterized.Parameters(name = "executionMode")
-    public static Collection<String> data() {
+    @Parameters(name = "executionMode")
+    private static Collection<String> data() {
         return Arrays.asList("streaming", "batch");
     }
 
-    @Rule
-    public final FlinkResource flink =
-            new LocalStandaloneFlinkResourceFactory()
-                    .create(
-                            FlinkResourceSetup.builder()
-                                    .addConfiguration(getConfiguration())
-                                    .build());
+    @RegisterExtension
+    private final FlinkResourceExtension flinkExtension =
+            new FlinkResourceExtension(
+                    new LocalStandaloneFlinkResourceFactory()
+                            .create(
+                                    FlinkResourceSetup.builder()
+                                            
.addConfiguration(getConfiguration())
+                                            .build()));
 
-    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+    @TempDir private Path tmp;
 
-    protected final String executionMode;
+    @Parameter String executionMode;
 
-    protected Path result;
+    Path result;
 
-    protected static final Path SQL_TOOL_BOX_JAR =
-            ResourceTestUtils.getResource(".*SqlToolbox.jar");
-
-    public SqlITCaseBase(String executionMode) {
-        this.executionMode = executionMode;
-    }
+    static final Path SQL_TOOL_BOX_JAR = 
ResourceTestUtils.getResource(".*SqlToolbox.jar");
 
     private static Configuration getConfiguration() {
         // we have to enable checkpoint to trigger flushing for filesystem sink
@@ -107,18 +104,17 @@ public abstract class SqlITCaseBase extends TestLogger {
         return flinkConfig;
     }
 
-    @Before
-    public void before() throws Exception {
-        Path tmpPath = tmp.getRoot().toPath();
-        LOG.info("The current temporary path: {}", tmpPath);
-        this.result = tmpPath.resolve(String.format("result-%s", 
UUID.randomUUID()));
+    @BeforeEach
+    void before() throws Exception {
+        LOG.info("The current temporary path: {}", tmp);
+        this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
     }
 
-    public void runAndCheckSQL(String sqlPath, List<String> resultItems) 
throws Exception {
+    void runAndCheckSQL(String sqlPath, List<String> resultItems) throws 
Exception {
         runAndCheckSQL(sqlPath, Collections.singletonMap(result, resultItems));
     }
 
-    public void runAndCheckSQL(
+    void runAndCheckSQL(
             String sqlPath,
             List<String> resultItems,
             Function<List<String>, List<String>> formatter)
@@ -130,18 +126,18 @@ public abstract class SqlITCaseBase extends TestLogger {
                 Collections.emptyList());
     }
 
-    public void runAndCheckSQL(String sqlPath, Map<Path, List<String>> 
resultItems)
-            throws Exception {
+    void runAndCheckSQL(String sqlPath, Map<Path, List<String>> resultItems) 
throws Exception {
         runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap(), 
Collections.emptyList());
     }
 
-    public void runAndCheckSQL(
+    void runAndCheckSQL(
             String sqlPath,
             Map<Path, List<String>> resultItems,
             Map<Path, Function<List<String>, List<String>>> formatters,
             List<URI> dependencies)
             throws Exception {
-        try (ClusterController clusterController = flink.startCluster(1)) {
+        try (ClusterController clusterController =
+                flinkExtension.getFlinkResource().startCluster(1)) {
             List<String> sqlLines = initializeSqlLines(sqlPath);
 
             executeSqlStatements(clusterController, sqlLines, dependencies);
@@ -158,14 +154,14 @@ public abstract class SqlITCaseBase extends TestLogger {
         }
     }
 
-    protected Map<String, String> generateReplaceVars() {
+    Map<String, String> generateReplaceVars() {
         Map<String, String> varsMap = new HashMap<>();
         varsMap.put("$RESULT", this.result.toAbsolutePath().toString());
         varsMap.put("$MODE", this.executionMode);
         return varsMap;
     }
 
-    protected abstract void executeSqlStatements(
+    abstract void executeSqlStatements(
             ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
             throws Exception;
 
@@ -200,9 +196,7 @@ public abstract class SqlITCaseBase extends TestLogger {
                 lines = readResultFiles(resultPath);
                 try {
                     List<String> actual = resultFormatter.apply(lines);
-                    assertThat(actual)
-                            .hasSameSizeAs(expectedItems)
-                            
.containsExactlyInAnyOrderElementsOf(expectedItems);
+                    
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedItems);
                     success = true;
                     break;
                 } catch (AssertionError e) {
@@ -217,10 +211,10 @@ public abstract class SqlITCaseBase extends TestLogger {
             }
             Thread.sleep(500);
         }
-        assertTrue(
-                success,
-                String.format(
-                        "Did not get expected results before timeout, actual 
result: %s.", lines));
+        assertThat(success)
+                .withFailMessage(
+                        "Did not get expected results before timeout, actual 
result: %s.", lines)
+                .isTrue();
     }
 
     private static List<String> readResultFiles(Path path) throws Exception {
@@ -242,16 +236,16 @@ public abstract class SqlITCaseBase extends TestLogger {
      *
      * <pre>{@code
      * @Override
-     * protected List<String> formatRawResult(List<String> rawResults) {
+     * List<String> formatRawResult(List<String> rawResults) {
      *     return convertToMaterializedResult(rawResults, schema, 
deserializationSchema);
      * }
      * }</pre>
      */
-    protected List<String> formatRawResult(List<String> rawResults) {
+    List<String> formatRawResult(List<String> rawResults) {
         return rawResults;
     }
 
-    protected static List<String> convertToMaterializedResult(
+    static List<String> convertToMaterializedResult(
             List<String> rawResults,
             ResolvedSchema schema,
             DeserializationSchema<RowData> deserializationSchema) {
@@ -298,7 +292,7 @@ public abstract class SqlITCaseBase extends TestLogger {
      * Create a DebeziumJsonDeserializationSchema using the given {@link 
ResolvedSchema} to convert
      * debezium-json formatted record into {@link RowData}.
      */
-    protected static DebeziumJsonDeserializationSchema 
createDebeziumDeserializationSchema(
+    static DebeziumJsonDeserializationSchema 
createDebeziumDeserializationSchema(
             ResolvedSchema schema) {
         return new DebeziumJsonDeserializationSchema(
                 schema.toPhysicalRowDataType(),
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
index 0432aa76ecc..6a3ae1f8130 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
@@ -26,19 +26,21 @@ import 
org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestTemplate;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.assertj.core.api.Assertions.fail;
+
 /** End-to-End tests for using remote jar. */
-public class UsingRemoteJarITCase extends HdfsITCaseBase {
+class UsingRemoteJarITCase extends HdfsITCaseBase {
 
     private static final ResolvedSchema USER_ORDER_SCHEMA =
             new ResolvedSchema(
@@ -55,16 +57,17 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
     private static final DebeziumJsonDeserializationSchema 
USER_ORDER_DESERIALIZATION_SCHEMA =
             createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
 
-    @Rule public TestName name = new TestName();
+    private TestInfo testInfo;
     private org.apache.hadoop.fs.Path hdPath;
     private org.apache.hadoop.fs.FileSystem hdfs;
 
-    public UsingRemoteJarITCase(String executionMode) {
-        super(executionMode);
+    @BeforeEach
+    void captureTestInfo(TestInfo testInfo) {
+        this.testInfo = testInfo;
     }
 
     @Override
-    protected void createHDFS() {
+    void createHDFS() {
         super.createHDFS();
         hdPath = new org.apache.hadoop.fs.Path("/test.jar");
         try {
@@ -73,22 +76,22 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
             hdfs.copyFromLocalFile(
                     new 
org.apache.hadoop.fs.Path(SQL_TOOL_BOX_JAR.toString()), hdPath);
         } catch (IOException e) {
-            Assert.fail("Failed to copy local test.jar to HDFS env" + 
e.getMessage());
+            fail("Failed to copy local test.jar to HDFS env" + e.getMessage(), 
e);
         }
     }
 
     @Override
-    protected void destroyHDFS() {
+    void destroyHDFS() {
         try {
             hdfs.delete(hdPath, false);
         } catch (IOException e) {
-            Assert.fail("Failed to cleanup HDFS path" + e.getMessage());
+            fail("Failed to cleanup HDFS path" + e.getMessage(), e);
         }
         super.destroyHDFS();
     }
 
-    @Test
-    public void testUdfInRemoteJar() throws Exception {
+    @TestTemplate
+    void testUdfInRemoteJar() throws Exception {
         runAndCheckSQL(
                 "remote_jar_e2e.sql",
                 Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
@@ -97,8 +100,8 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
                                 raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
-    @Test
-    public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception 
{
+    @TestTemplate
+    void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception {
         runAndCheckSQL(
                 "sql_client_remote_jar_e2e.sql",
                 Collections.singletonMap(result, Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]")),
@@ -116,16 +119,16 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
                                         hdPath))));
     }
 
-    @Test
-    public void testScalarUdfWhenCheckpointEnable() throws Exception {
+    @TestTemplate
+    void testScalarUdfWhenCheckpointEnable() throws Exception {
         runAndCheckSQL(
                 "scalar_udf_e2e.sql",
                 Collections.singletonList(
                         "{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello 
Flink\"},\"op\":\"c\"}"));
     }
 
-    @Test
-    public void testCreateTemporarySystemFunctionUsingRemoteJar() throws 
Exception {
+    @TestTemplate
+    void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
                 Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
@@ -134,8 +137,8 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
                                 raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
-    @Test
-    public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
+    @TestTemplate
+    void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
                 Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
@@ -144,8 +147,8 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
                                 raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
-    @Test
-    public void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws 
Exception {
+    @TestTemplate
+    void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception {
         Map<String, String> replaceVars = generateReplaceVars();
         replaceVars.put("$TEMPORARY", "TEMPORARY");
         runAndCheckSQL(
@@ -157,7 +160,7 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
     }
 
     @Override
-    protected Map<String, String> generateReplaceVars() {
+    Map<String, String> generateReplaceVars() {
         String remoteJarPath =
                 String.format(
                         "hdfs://%s:%s/%s",
@@ -165,7 +168,7 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
 
         Map<String, String> varsMap = super.generateReplaceVars();
         varsMap.put("$JAR_PATH", remoteJarPath);
-        String methodName = name.getMethodName();
+        String methodName = 
testInfo.getTestMethod().map(Method::getName).orElse("");
         if (methodName.startsWith("testCreateTemporarySystemFunction")) {
             varsMap.put("$TEMPORARY", "TEMPORARY SYSTEM");
         } else if 
(methodName.startsWith("testCreateTemporaryCatalogFunction")) {
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
index b527d11c29d..876202240a4 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
@@ -26,10 +26,10 @@ import org.junit.jupiter.api.Test;
  * <p>This test verifies that AsyncScalarFunction works correctly. The test 
passes if the
  * application runs without errors.
  */
-public class AsyncScalarFunctionTest {
+class AsyncScalarFunctionTest {
 
     @Test
-    public void testAsyncScalarFunction() throws Exception {
+    void testAsyncScalarFunction() throws Exception {
         // Create and run the example application
         AsyncScalarFunctionExample example = new AsyncScalarFunctionExample();
         example.execute();
diff --git 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
index 6200b52b87e..170a08598da 100644
--- 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
+++ 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
@@ -33,18 +33,19 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import javax.annotation.Nullable;
 
@@ -60,33 +61,36 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /** End-to-end test for the availability of metrics. */
-public class MetricsAvailabilityITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class MetricsAvailabilityITCase {
 
     private static final String HOST = "localhost";
     private static final int PORT = 8081;
 
-    @Rule
-    public final FlinkResource dist =
-            new 
LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+    @RegisterExtension
+    private final FlinkResourceExtension dist =
+            new FlinkResourceExtension(
+                    new LocalStandaloneFlinkResourceFactory()
+                            .create(FlinkResourceSetup.builder().build()));
 
     @Nullable private static ScheduledExecutorService scheduledExecutorService 
= null;
 
-    @BeforeClass
-    public static void startExecutor() {
+    @BeforeAll
+    static void startExecutor() {
         scheduledExecutorService = Executors.newScheduledThreadPool(4);
     }
 
-    @AfterClass
-    public static void shutdownExecutor() {
+    @AfterAll
+    static void shutdownExecutor() {
         if (scheduledExecutorService != null) {
             scheduledExecutorService.shutdown();
         }
     }
 
     @Test
-    public void testReporter() throws Exception {
+    void testReporter() throws Exception {
         final Deadline deadline = Deadline.fromNow(Duration.ofMinutes(10));
-        try (ClusterController ignored = dist.startCluster(1)) {
+        try (ClusterController ignored = 
dist.getFlinkResource().startCluster(1)) {
             final RestClient restClient =
                     new RestClient(new Configuration(), 
scheduledExecutorService);
 
diff --git 
a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 
b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 5556aad1b83..db527f0ee74 100644
--- 
a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ 
b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -23,29 +23,29 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.prometheus.PrometheusReporterFactory;
 import org.apache.flink.tests.util.AutoClosableProcess;
 import org.apache.flink.tests.util.CommandLineWrapper;
-import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.cache.DownloadCacheExtension;
 import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.JarLocation;
 import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.ProcessorArchitecture;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,24 +60,29 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
 import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** End-to-end test for the PrometheusReporter. */
-@RunWith(Parameterized.class)
-public class PrometheusReporterEndToEndITCase extends TestLogger {
+@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class})
+class PrometheusReporterEndToEndITCase {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    private static final String PROMETHEUS_VERSION = "2.4.3";
-    private static final String PROMETHEUS_FILE_NAME;
+    private static final String PROMETHEUS_VERSION = "3.11.2";
     private static final String PROMETHEUS_JAR_PREFIX = 
"flink-metrics-prometheus";
 
-    static {
-        final String base = "prometheus-" + PROMETHEUS_VERSION + '.';
-        final String os;
-        final String platform;
+    private static String prometheusFileName;
+
+    private static final Pattern LOG_REPORTER_PORT_PATTERN =
+            Pattern.compile(".*Started PrometheusReporter HTTP server on port 
([0-9]+).*");
+
+    private static String getPrometheusFileName() {
+        String base = "prometheus-" + PROMETHEUS_VERSION + '.';
+        String os;
+        String platform;
         switch (OperatingSystem.getCurrentOperatingSystem()) {
             case MAC_OS:
                 os = "darwin";
@@ -107,19 +112,17 @@ public class PrometheusReporterEndToEndITCase extends 
TestLogger {
                 break;
         }
 
-        PROMETHEUS_FILE_NAME = base + os + "-" + platform;
+        return String.format("%s%s-%s", base, os, platform);
     }
 
-    private static final Pattern LOG_REPORTER_PORT_PATTERN =
-            Pattern.compile(".*Started PrometheusReporter HTTP server on port 
([0-9]+).*");
-
-    @BeforeClass
-    public static void checkOS() {
-        Assume.assumeFalse("This test does not run on Windows.", 
OperatingSystem.isWindows());
+    @BeforeAll
+    static void beforeAll() {
+        assumeThat(OperatingSystem.isWindows()).as("This test does not run on 
Windows.").isFalse();
+        prometheusFileName = getPrometheusFileName();
     }
 
-    @Parameterized.Parameters(name = "{index}: {0}")
-    public static Collection<TestParams> testParameters() {
+    @Parameters(name = "{0}")
+    static Collection<TestParams> testParameters() {
         return Arrays.asList(
                 TestParams.from(
                         "Jar in 'lib'",
@@ -137,18 +140,21 @@ public class PrometheusReporterEndToEndITCase extends 
TestLogger {
                         }));
     }
 
-    @Rule public final FlinkResource dist;
+    @RegisterExtension private final FlinkResourceExtension dist;
 
-    public PrometheusReporterEndToEndITCase(TestParams params) {
+    PrometheusReporterEndToEndITCase(TestParams params) {
         final FlinkResourceSetup.FlinkResourceSetupBuilder builder = 
FlinkResourceSetup.builder();
         params.getBuilderSetup().accept(builder);
         builder.addConfiguration(getFlinkConfig());
-        dist = new 
LocalStandaloneFlinkResourceFactory().create(builder.build());
+        dist =
+                new FlinkResourceExtension(
+                        new 
LocalStandaloneFlinkResourceFactory().create(builder.build()));
     }
 
-    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+    @TempDir private Path tmp;
 
-    @Rule public final DownloadCache downloadCache = DownloadCache.get();
+    @RegisterExtension
+    private final DownloadCacheExtension downloadCacheExtension = new 
DownloadCacheExtension();
 
     private static Configuration getFlinkConfig() {
         final Configuration config = new Configuration();
@@ -162,20 +168,20 @@ public class PrometheusReporterEndToEndITCase extends 
TestLogger {
         return config;
     }
 
-    @Test
-    public void testReporter() throws Exception {
-        final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
-        final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
+    @TestTemplate
+    void testReporter() throws Exception {
+        final Path tmpPrometheusDir = tmp.resolve("prometheus");
+        final Path prometheusBinDir = 
tmpPrometheusDir.resolve(prometheusFileName);
         final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
         final Path prometheusBinary = prometheusBinDir.resolve("prometheus");
         Files.createDirectory(tmpPrometheusDir);
 
         final Path prometheusArchive =
-                downloadCache.getOrDownload(
+                downloadCacheExtension.getOrDownload(
                         
"https://github.com/prometheus/prometheus/releases/download/v";
                                 + PROMETHEUS_VERSION
                                 + '/'
-                                + PROMETHEUS_FILE_NAME
+                                + prometheusFileName
                                 + ".tar.gz",
                         tmpPrometheusDir);
 
@@ -193,10 +199,11 @@ public class PrometheusReporterEndToEndITCase extends 
TestLogger {
                         .inPlace()
                         .build());
 
-        try (ClusterController ignored = dist.startCluster(1)) {
+        try (ClusterController ignored = 
dist.getFlinkResource().startCluster(1)) {
 
             final List<Integer> ports =
-                    dist.searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> 
matcher.group(1))
+                    dist.getFlinkResource()
+                            .searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher 
-> matcher.group(1))
                             .map(Integer::valueOf)
                             .collect(Collectors.toList());
 
@@ -288,7 +295,7 @@ public class PrometheusReporterEndToEndITCase extends 
TestLogger {
                 "Could not retrieve metric " + metric + " from Prometheus.", 
reportedException);
     }
 
-    static class TestParams {
+    private static class TestParams {
         private final String jarLocationDescription;
         private final Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> 
builderSetup;
 
@@ -299,13 +306,13 @@ public class PrometheusReporterEndToEndITCase extends 
TestLogger {
             this.builderSetup = builderSetup;
         }
 
-        public static TestParams from(
+        private static TestParams from(
                 String jarLocationDesription,
                 Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> 
builderSetup) {
             return new TestParams(jarLocationDesription, builderSetup);
         }
 
-        public Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> 
getBuilderSetup() {
+        private Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> 
getBuilderSetup() {
             return builderSetup;
         }
 
diff --git 
a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
 
b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
index 676846527bd..3ff92b30a7b 100644
--- 
a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
+++ 
b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
@@ -62,7 +62,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** E2E Test for SqlClient. */
 @Testcontainers
-public class SqlClientITCase {
+class SqlClientITCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SqlClientITCase.class);
 
@@ -76,16 +76,16 @@ public class SqlClientITCase {
     private final Path sqlConnectorUpsertTestJar =
             ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar");
 
-    public static final Network NETWORK = Network.newNetwork();
+    private static final Network NETWORK = Network.newNetwork();
 
     @Container
-    public static final KafkaContainer KAFKA =
+    private static final KafkaContainer KAFKA =
             new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
                     .withLogConsumer(LOG_CONSUMER);
 
-    public final FlinkContainers flink =
+    private final FlinkContainers flink =
             FlinkContainers.builder()
                     .withFlinkContainersSettings(
                             FlinkContainersSettings.builder()
diff --git 
a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
 
b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
index 0d7387b5803..3bf654b97af 100644
--- 
a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
+++ 
b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
@@ -31,22 +31,24 @@ import org.apache.flink.test.util.SQLJobClientMode;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 import org.apache.flink.tests.util.flink.FlinkDistribution;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.GatewayController;
 import org.apache.flink.tests.util.flink.JarLocation;
 import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -67,6 +69,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TimeZone;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -76,10 +79,12 @@ import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.ADD
 import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.PORT;
 import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampMillis;
 import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** E2E Tests for {@code SqlGateway} with {@link HiveServer2Endpoint}. */
-public class SqlGatewayE2ECase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+class SqlGatewayE2ECase {
 
     private static final Path HIVE_SQL_CONNECTOR_JAR =
             
ResourceTestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
@@ -92,9 +97,9 @@ public class SqlGatewayE2ECase extends TestLogger {
     private static final Configuration ENDPOINT_CONFIG = new Configuration();
     private static final String RESULT_KEY = "$RESULT";
 
-    @ClassRule public static final TemporaryFolder FOLDER = new 
TemporaryFolder();
-    @ClassRule public static final HiveContainer HIVE_CONTAINER = new 
HiveContainer();
-    @Rule public final FlinkResource flinkResource = buildFlinkResource();
+    @TempDir private static Path FOLDER;
+    @Container private static final HiveContainer HIVE_CONTAINER = new 
HiveContainer();
+    @RegisterExtension private final FlinkResourceExtension 
flinkResourceExtension = buildFlinkResource();
 
     private static NetUtils.Port hiveserver2Port;
     private static NetUtils.Port restPort;
@@ -105,20 +110,20 @@ public class SqlGatewayE2ECase extends TestLogger {
     private static final AtomicInteger CATALOG_COUNTER = new AtomicInteger(0);
     private static String filesystemCatalogName;
 
-    @BeforeClass
-    public static void beforeClass() {
+    @BeforeAll
+    static void beforeClass() {
         ENDPOINT_CONFIG.setString(
                 getPrefixedConfigOptionName(CATALOG_HIVE_CONF_DIR), 
createHiveConf().getParent());
     }
 
-    @AfterClass
-    public static void afterClass() throws Exception {
+    @AfterAll
+    static void afterClass() throws Exception {
         hiveserver2Port.close();
         restPort.close();
     }
 
     @Test
-    public void testHiveServer2ExecuteStatement() throws Exception {
+    void testHiveServer2ExecuteStatement() throws Exception {
         executeStatement(
                 SQLJobClientMode.getHiveJDBC(
                         InetAddress.getByName("localhost").getHostAddress(),
@@ -126,7 +131,7 @@ public class SqlGatewayE2ECase extends TestLogger {
     }
 
     @Test
-    public void testRestExecuteStatement() throws Exception {
+    void testRestExecuteStatement() throws Exception {
         executeStatement(
                 SQLJobClientMode.getRestClient(
                         InetAddress.getByName("localhost").getHostAddress(),
@@ -135,19 +140,19 @@ public class SqlGatewayE2ECase extends TestLogger {
     }
 
     @Test
-    public void testSqlClientExecuteStatement() throws Exception {
+    void testSqlClientExecuteStatement() throws Exception {
         executeStatement(
                 SQLJobClientMode.getGatewaySqlClient(
                         InetAddress.getByName("localhost").getHostAddress(), 
restPort.getPort()));
     }
 
     @Test
-    public void testMaterializedTableInContinuousMode() throws Exception {
+    void testMaterializedTableInContinuousMode() throws Exception {
         Duration continuousWaitTime = Duration.ofMinutes(5);
         Duration continuousWaitPause = Duration.ofSeconds(10);
 
-        try (GatewayController gateway = flinkResource.startSqlGateway();
-                ClusterController ignore = flinkResource.startCluster(2)) {
+        try (GatewayController gateway = 
flinkResourceExtension.getFlinkResource().startSqlGateway();
+                ClusterController ignore = 
flinkResourceExtension.getFlinkResource().startCluster(2)) {
 
             FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
                     initSessionWithCatalogStore(Collections.emptyMap());
@@ -199,12 +204,12 @@ public class SqlGatewayE2ECase extends TestLogger {
                     continuousWaitPause,
                     "Failed to wait for the result");
 
-            File savepointFolder = FOLDER.newFolder("savepoint");
+            Path savepointFolder = 
Files.createDirectory(FOLDER.resolve("savepoint-" + UUID.randomUUID()));
             // configure savepoint path
             gatewayRestClient.executeStatementWithResult(
                     String.format(
                             "set 
'execution.checkpointing.savepoint-dir'='file://%s'",
-                            savepointFolder.getAbsolutePath()));
+                            savepointFolder.toAbsolutePath()));
 
             // suspend the materialized table
             gatewayRestClient.executeStatementWithResult(
@@ -239,13 +244,13 @@ public class SqlGatewayE2ECase extends TestLogger {
     }
 
     @Test
-    public void testMaterializedTableInFullMode() throws Exception {
+    void testMaterializedTableInFullMode() throws Exception {
         Duration fullModeWaitTime = Duration.ofMinutes(5);
         Duration fullModeWaitPause = Duration.ofSeconds(10);
 
         // init session
-        try (GatewayController gateway = flinkResource.startSqlGateway();
-                ClusterController ignore = flinkResource.startCluster(2)) {
+        try (GatewayController gateway = 
flinkResourceExtension.getFlinkResource().startSqlGateway();
+                ClusterController ignore = 
flinkResourceExtension.getFlinkResource().startCluster(2)) {
 
             Map<String, String> sessionProperties = new HashMap<>();
             sessionProperties.put("workflow-scheduler.type", "embedded");
@@ -402,11 +407,11 @@ public class SqlGatewayE2ECase extends TestLogger {
 
     private FlinkDistribution.TestSqlGatewayRestClient 
initSessionWithCatalogStore(
             Map<String, String> extraProperties) throws Exception {
-        File catalogStoreFolder = FOLDER.newFolder();
+        Path catalogStoreFolder = Files.createTempDirectory(FOLDER, 
"catalogStore");
         Map<String, String> sessionProperties = new HashMap<>();
         sessionProperties.put("table.catalog-store.kind", "file");
         sessionProperties.put(
-                "table.catalog-store.file.path", 
catalogStoreFolder.getAbsolutePath());
+                "table.catalog-store.file.path", 
catalogStoreFolder.toAbsolutePath().toString());
         sessionProperties.putAll(extraProperties);
 
         FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
@@ -417,9 +422,10 @@ public class SqlGatewayE2ECase extends TestLogger {
                         sessionProperties);
 
         filesystemCatalogName = CATALOG_NAME_PREFIX + 
CATALOG_COUNTER.getAndAdd(1);
-        File catalogFolder = FOLDER.newFolder(filesystemCatalogName);
-        FOLDER.newFolder(
-                String.format("%s/%s", filesystemCatalogName, 
FILESYSTEM_DEFAULT_DATABASE));
+        Path catalogFolder = 
Files.createDirectory(FOLDER.resolve(filesystemCatalogName));
+        Files.createDirectory(
+                FOLDER.resolve(
+                        String.format("%s/%s", filesystemCatalogName, 
FILESYSTEM_DEFAULT_DATABASE)));
         String createCatalogDDL =
                 String.format(
                         "CREATE CATALOG %s WITH (\n"
@@ -427,7 +433,7 @@ public class SqlGatewayE2ECase extends TestLogger {
                                 + "  'default-database' = 'test_db',\n"
                                 + "  'path' = '%s'\n"
                                 + ")",
-                        filesystemCatalogName, 
catalogFolder.getAbsolutePath());
+                        filesystemCatalogName, catalogFolder.toAbsolutePath());
         gatewayRestClient.executeStatementWithResult(createCatalogDDL);
         gatewayRestClient.executeStatementWithResult(
                 String.format("USE CATALOG %s", filesystemCatalogName));
@@ -436,16 +442,16 @@ public class SqlGatewayE2ECase extends TestLogger {
     }
 
     private void executeStatement(SQLJobClientMode mode) throws Exception {
-        File result = FOLDER.newFolder(mode.getClass().getName() + ".csv");
-        try (GatewayController gateway = flinkResource.startSqlGateway();
-                ClusterController ignore = flinkResource.startCluster(1)) {
+        Path result = Files.createTempDirectory(FOLDER, 
mode.getClass().getName() + ".csv");
+        try (GatewayController gateway = 
flinkResourceExtension.getFlinkResource().startSqlGateway();
+                ClusterController ignore = 
flinkResourceExtension.getFlinkResource().startCluster(1)) {
             gateway.submitSQLJob(
-                    new 
SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result))
+                    new 
SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result.toFile()))
                             .setClientMode(mode)
                             .build(),
                     Duration.ofSeconds(60));
         }
-        assertEquals(Collections.singletonList("1"), 
readCsvResultFiles(result.toPath()));
+        assertThat(readCsvResultFiles(result)).containsExactly("1");
     }
 
     private static List<String> getSqlLines(File result) throws Exception {
@@ -480,7 +486,7 @@ public class SqlGatewayE2ECase extends TestLogger {
         }
         hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
HIVE_CONTAINER.getHiveMetastoreURI());
         try {
-            File site = FOLDER.newFile(HiveCatalog.HIVE_SITE_FILE);
+            File site = 
Files.createFile(FOLDER.resolve(HiveCatalog.HIVE_SITE_FILE)).toFile();
             try (OutputStream out = new FileOutputStream(site)) {
                 hiveConf.writeXml(out);
             }
@@ -495,7 +501,7 @@ public class SqlGatewayE2ECase extends TestLogger {
      * hadoop.classpath which contains all hadoop jars. It also moves planner 
to the lib and remove
      * the planner load to make the Hive sql connector works.
      */
-    private static FlinkResource buildFlinkResource() {
+    private static FlinkResourceExtension buildFlinkResource() {
         // add hive jar and planner jar
         FlinkResourceSetup.FlinkResourceSetupBuilder builder =
                 FlinkResourceSetup.builder()
@@ -536,7 +542,7 @@ public class SqlGatewayE2ECase extends TestLogger {
         ENDPOINT_CONFIG.addAll(Configuration.fromMap(endpointConfig));
         builder.addConfiguration(ENDPOINT_CONFIG);
 
-        return new 
LocalStandaloneFlinkResourceFactory().create(builder.build());
+        return new FlinkResourceExtension(new 
LocalStandaloneFlinkResourceFactory().create(builder.build()));
     }
 
     private static String getPrefixedConfigOptionName(ConfigOption<?> option) {
diff --git 
a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
 
b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
index 6fa9064fcfd..a908f270f78 100644
--- 
a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
+++ 
b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
@@ -25,7 +25,6 @@ import okhttp3.FormBody;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
-import org.junit.runner.Description;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.GenericContainer;
@@ -42,8 +41,8 @@ public class HiveContainer extends 
GenericContainer<HiveContainer> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveContainer.class);
 
-    public static final String HOST_NAME = "hadoop-master";
-    public static final int HIVE_METASTORE_PORT = 9083;
+    private static final String HOST_NAME = "hadoop-master";
+    private static final int HIVE_METASTORE_PORT = 9083;
     private static final int NAME_NODE_WEB_PORT = 50070;
 
     // Detailed log paths are from
@@ -96,9 +95,9 @@ public class HiveContainer extends 
GenericContainer<HiveContainer> {
     }
 
     @Override
-    protected void finished(Description description) {
+    protected void containerIsStopping(InspectContainerResponse containerInfo) 
{
         backupLogs();
-        super.finished(description);
+        super.containerIsStopping(containerInfo);
     }
 
     public String getHiveMetastoreURI() {
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java
deleted file mode 100644
index 2bb69bd333e..00000000000
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * Modified version of the jUnit {@link org.junit.rules.ExternalResource}.
- *
- * <p>This version is an interface instead of an abstract class and allows 
resources to
- * differentiate between successful and failed tests in their {@code After} 
methods.
- */
-public interface ExternalResource extends TestRule {
-
-    void before() throws Exception;
-
-    void afterTestSuccess();
-
-    default void afterTestFailure() {
-        afterTestSuccess();
-    }
-
-    @Override
-    default Statement apply(final Statement base, final Description 
description) {
-        return new Statement() {
-            @Override
-            public void evaluate() throws Throwable {
-                before();
-                try {
-                    base.evaluate();
-                } catch (final Throwable testThrowable) {
-                    try {
-                        afterTestFailure();
-                    } catch (final Throwable afterFailureThrowable) {
-                        testThrowable.addSuppressed(afterFailureThrowable);
-                    }
-                    throw testThrowable;
-                }
-                afterTestSuccess();
-            }
-        };
-    }
-}


Reply via email to