This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e3abe28f62 [FLINK-33024][table-planner][JUnit5 Migration] Module: 
flink-table-planner (JsonPlanTestBase) (#23353)
5e3abe28f62 is described below

commit 5e3abe28f62ff9c1b3de2a48f4474b616bda112a
Author: Jiabao Sun <[email protected]>
AuthorDate: Tue Sep 19 22:45:59 2023 -0500

    [FLINK-33024][table-planner][JUnit5 Migration] Module: flink-table-planner 
(JsonPlanTestBase) (#23353)
---
 .../apache/flink/table/api/CompiledPlanITCase.java | 65 ++++++++++++----------
 .../stream/jsonplan/CalcJsonPlanITCase.java        | 10 ++--
 .../jsonplan/ChangelogSourceJsonPlanITCase.java    | 12 ++--
 .../ConfigureOperatorLevelStateTtlJsonITCase.java  |  8 +--
 .../stream/jsonplan/CorrelateJsonPlanITCase.java   | 22 ++++----
 .../jsonplan/DeduplicationJsonPlanITCase.java      |  6 +-
 .../stream/jsonplan/ExpandJsonPlanITCase.java      |  6 +-
 .../jsonplan/GroupAggregateJsonPlanITCase.java     | 41 +++++++-------
 .../jsonplan/GroupWindowAggregateJsonITCase.java   | 17 +++---
 .../IncrementalAggregateJsonPlanITCase.java        | 14 ++---
 .../jsonplan/IntervalJoinJsonPlanITCase.java       |  8 +--
 .../stream/jsonplan/JoinJsonPlanITCase.java        | 22 ++++----
 .../stream/jsonplan/LimitJsonPlanITCase.java       |  6 +-
 .../stream/jsonplan/LookupJoinJsonPlanITCase.java  | 14 +++--
 .../jsonplan/MatchRecognizeJsonPlanITCase.java     |  8 +--
 .../jsonplan/OverAggregateJsonPlanITCase.java      | 10 ++--
 .../stream/jsonplan/RankJsonPlanITCase.java        |  8 +--
 .../stream/jsonplan/SargJsonPlanITCase.java        |  6 +-
 .../stream/jsonplan/SortLimitJsonPlanITCase.java   |  6 +-
 .../stream/jsonplan/TableSinkJsonPlanITCase.java   | 15 ++---
 .../stream/jsonplan/TableSourceJsonPlanITCase.java | 18 +++---
 .../jsonplan/TemporalJoinJsonPlanITCase.java       | 12 ++--
 .../stream/jsonplan/TemporalSortJsonITCase.java    |  8 +--
 .../stream/jsonplan/UnionJsonPlanITCase.java       |  6 +-
 .../stream/jsonplan/ValuesJsonPlanITCase.java      |  6 +-
 .../jsonplan/WatermarkAssignerJsonPlanITCase.java  | 10 ++--
 .../stream/jsonplan/WindowAggregateJsonITCase.java | 41 +++++++-------
 .../jsonplan/WindowDeduplicateJsonITCase.java      | 13 +++--
 .../stream/jsonplan/WindowJoinJsonITCase.java      | 13 +++--
 .../jsonplan/WindowTableFunctionJsonITCase.java    | 13 +++--
 .../table/planner/utils/JsonPlanTestBase.java      | 35 ++++++++----
 31 files changed, 258 insertions(+), 221 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 4ca0a5d038d..591f0a52e4a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -25,10 +25,11 @@ import 
org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.table.planner.utils.JsonTestUtils;
 import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
 import org.apache.commons.io.FileUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,15 +48,16 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link CompiledPlan} and related {@link TableEnvironment} 
methods. */
-public class CompiledPlanITCase extends JsonPlanTestBase {
+class CompiledPlanITCase extends JsonPlanTestBase {
 
     private static final List<String> DATA =
             Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
     private static final String[] COLUMNS_DEFINITION =
             new String[] {"a bigint", "b int", "c varchar"};
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
 
         String srcTableDdl =
@@ -76,7 +78,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompilePlanSql() throws IOException {
+    void testCompilePlanSql() throws IOException {
         CompiledPlan compiledPlan =
                 tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM 
MyTable");
         String expected = 
TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
@@ -92,7 +94,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testExecutePlanSql() throws Exception {
+    void testExecutePlanSql() throws Exception {
         File sinkPath = createSourceSinkTables();
 
         tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM 
src").execute().await();
@@ -101,10 +103,10 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testExecuteCtasPlanSql() throws Exception {
+    void testExecuteCtasPlanSql() throws Exception {
         createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
 
-        File sinkPath = TEMPORARY_FOLDER.newFolder();
+        File sinkPath = TempDirUtils.newFolder(tempFolder);
         assertThatThrownBy(
                         () ->
                                 tableEnv.compilePlanSql(
@@ -125,7 +127,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testExecutePlanTable() throws Exception {
+    void testExecutePlanTable() throws Exception {
         File sinkPath = createSourceSinkTables();
 
         
tableEnv.from("src").select($("*")).insertInto("sink").compilePlan().execute().await();
@@ -134,8 +136,9 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompileWriteToFileAndThenExecuteSql() throws Exception {
-        Path planPath = 
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+    void testCompileWriteToFileAndThenExecuteSql() throws Exception {
+        Path planPath =
+                Paths.get(TempDirUtils.newFolder(tempFolder, 
"plan").getPath(), "plan.json");
 
         File sinkPath = createSourceSinkTables();
 
@@ -148,8 +151,9 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws 
Exception {
-        Path planPath = 
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+    void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws 
Exception {
+        Path planPath =
+                Paths.get(TempDirUtils.newFolder(tempFolder, 
"plan").getPath(), "plan.json");
 
         File sinkPath = createSourceSinkTables();
 
@@ -165,9 +169,9 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompilePlan() throws Exception {
+    void testCompilePlan() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), 
"plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, 
"plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         File sinkPath = createSourceSinkTables();
@@ -195,9 +199,9 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompilePlanWithStatementSet() throws Exception {
+    void testCompilePlanWithStatementSet() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), 
"plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, 
"plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
@@ -226,9 +230,9 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompilePlanIfNotExists() throws Exception {
+    void testCompilePlanIfNotExists() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), 
"plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, 
"plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         File sinkPath = createSourceSinkTables();
@@ -256,11 +260,14 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompilePlanOverwrite() throws Exception {
+    void testCompilePlanOverwrite() throws Exception {
         tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, 
true);
 
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), 
"plan.json")
+                Paths.get(
+                                URI.create(TempDirUtils.newFolder(tempFolder, 
"plan").getPath())
+                                        .getPath(),
+                                "plan.json")
                         .toAbsolutePath();
 
         List<String> expectedData =
@@ -297,9 +304,9 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompileAndExecutePlan() throws Exception {
+    void testCompileAndExecutePlan() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), 
"plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, 
"plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         File sinkPath = createSourceSinkTables();
@@ -316,9 +323,9 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCompileAndExecutePlanWithStatementSet() throws Exception {
+    void testCompileAndExecutePlanWithStatementSet() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), 
"plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, 
"plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
@@ -344,7 +351,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testExplainPlan() throws IOException {
+    void testExplainPlan() throws IOException {
         String planFromResources =
                 JsonTestUtils.setFlinkVersion(
                                 
JsonTestUtils.readFromResource("/jsonplan/testGetJsonPlan.out"),
@@ -360,7 +367,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testPersistedConfigOption() throws Exception {
+    void testPersistedConfigOption() throws Exception {
         List<String> data =
                 Stream.concat(
                                 DATA.stream(),
@@ -397,7 +404,7 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testBatchMode() {
+    void testBatchMode() {
         tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
 
         String srcTableDdl =
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
index 8d03a24dd14..89aac5a5e3e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -33,10 +33,10 @@ import java.util.Collections;
 import java.util.List;
 
 /** Test for calc json plan. */
-public class CalcJsonPlanITCase extends JsonPlanTestBase {
+class CalcJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testSimpleCalc() throws Exception {
+    void testSimpleCalc() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not 
null", "c varchar");
         File sinkPath =
@@ -55,7 +55,7 @@ public class CalcJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testCalcWithUdf() throws Exception {
+    void testCalcWithUdf() throws Exception {
         tableEnv.createTemporaryFunction("udf1", new JavaFunc0());
         tableEnv.createTemporarySystemFunction("udf2", new JavaFunc2());
         tableEnv.createFunction("udf3", UdfWithOpen.class);
@@ -86,7 +86,7 @@ public class CalcJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testProjectPushDown() throws Exception {
+    void testProjectPushDown() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not 
null", "c varchar");
         File sinkPath = createTestCsvSinkTable("MySink", "b int", "a bigint", 
"a1 varchar");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
index 833ba3fce08..5ceb0bd040c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,10 +31,10 @@ import java.util.List;
 import java.util.Map;
 
 /** Integration tests for operations on changelog source, including upsert 
source. */
-public class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
+class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testChangelogSource() throws Exception {
+    void testChangelogSource() throws Exception {
         registerChangelogSource();
         createTestNonInsertOnlyValuesSinkTable(
                 "user_sink",
@@ -56,7 +56,7 @@ public class ChangelogSourceJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testToUpsertSource() throws Exception {
+    void testToUpsertSource() throws Exception {
         registerUpsertSource();
         createTestNonInsertOnlyValuesSinkTable(
                 "user_sink",
@@ -79,7 +79,7 @@ public class ChangelogSourceJsonPlanITCase extends 
JsonPlanTestBase {
 
     // 
------------------------------------------------------------------------------------------
 
-    public void registerChangelogSource() {
+    protected void registerChangelogSource() {
         Map<String, String> properties = new HashMap<>();
         properties.put("changelog-mode", "I,UA,UB,D");
         createTestValuesSourceTable(
@@ -95,7 +95,7 @@ public class ChangelogSourceJsonPlanITCase extends 
JsonPlanTestBase {
                 properties);
     }
 
-    public void registerUpsertSource() {
+    protected void registerUpsertSource() {
         Map<String, String> properties = new HashMap<>();
         properties.put("changelog-mode", "I,UA,D");
         createTestValuesSourceTable(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
index 6d1a22003a1..fb879bf10c5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.utils.JsonTestUtils;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -39,10 +39,10 @@ import java.util.Map;
  * Tests for configuring operator-level state TTL via {@link
  * org.apache.flink.table.api.CompiledPlan}.
  */
-public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase 
{
+class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
 
     @Test
-    public void testDifferentStateTtlForDifferentOneInputOperator() throws 
Exception {
+    void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
         String dataId =
                 TestValuesTableFactory.registerRowData(
                         Arrays.asList(
@@ -117,7 +117,7 @@ public class ConfigureOperatorLevelStateTtlJsonITCase 
extends JsonPlanTestBase {
     }
 
     @Test
-    public void testDifferentStateTtlForSameTwoInputStreamOperator() throws 
Exception {
+    void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception 
{
         String leftTableDataId =
                 TestValuesTableFactory.registerRowData(
                         Arrays.asList(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
index 1874771514f..a90c3cd86cb 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunction
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,16 +32,16 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Integration tests for correlate. */
-public class CorrelateJsonPlanITCase extends JsonPlanTestBase {
+class CorrelateJsonPlanITCase extends JsonPlanTestBase {
 
-    @Before
-    public void before() {
+    @BeforeEach
+    void before() {
         List<Row> data = Collections.singletonList(Row.of("1,1,hi"));
         createTestValuesSourceTable("MyTable", data, "a varchar");
     }
 
     @Test
-    public void testSystemFuncByObject() throws ExecutionException, 
InterruptedException {
+    void testSystemFuncByObject() throws ExecutionException, 
InterruptedException {
         tableEnv.createTemporarySystemFunction(
                 "STRING_SPLIT", new 
JavaUserDefinedTableFunctions.StringSplit());
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -53,7 +53,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase 
{
     }
 
     @Test
-    public void testSystemFuncByClass() throws ExecutionException, 
InterruptedException {
+    void testSystemFuncByClass() throws ExecutionException, 
InterruptedException {
         tableEnv.createTemporarySystemFunction(
                 "STRING_SPLIT", 
JavaUserDefinedTableFunctions.StringSplit.class);
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -65,7 +65,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase 
{
     }
 
     @Test
-    public void testTemporaryFuncByObject() throws ExecutionException, 
InterruptedException {
+    void testTemporaryFuncByObject() throws ExecutionException, 
InterruptedException {
         tableEnv.createTemporaryFunction(
                 "STRING_SPLIT", new 
JavaUserDefinedTableFunctions.StringSplit());
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -77,7 +77,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase 
{
     }
 
     @Test
-    public void testTemporaryFuncByClass() throws ExecutionException, 
InterruptedException {
+    void testTemporaryFuncByClass() throws ExecutionException, 
InterruptedException {
         tableEnv.createTemporaryFunction(
                 "STRING_SPLIT", 
JavaUserDefinedTableFunctions.StringSplit.class);
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -89,7 +89,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase 
{
     }
 
     @Test
-    public void testFilter() throws ExecutionException, InterruptedException {
+    void testFilter() throws ExecutionException, InterruptedException {
         tableEnv.createTemporarySystemFunction(
                 "STRING_SPLIT", new 
JavaUserDefinedTableFunctions.StringSplit());
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -103,7 +103,7 @@ public class CorrelateJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testUnnest() throws ExecutionException, InterruptedException {
+    void testUnnest() throws ExecutionException, InterruptedException {
         List<Row> data =
                 Collections.singletonList(
                         Row.of("Bob", new Row[] {Row.of("1"), Row.of("2"), 
Row.of("3")}));
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
index d095ae00cd1..afb9f5e2d5c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
@@ -24,16 +24,16 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for deduplication json plan. */
-public class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
+class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testDeduplication() throws Exception {
+    void testDeduplication() throws Exception {
         List<Row> data =
                 Arrays.asList(
                         Row.of(1L, "terry", "pen", 1000L),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
index 4ebf82e1876..1226dac39ca 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
@@ -26,16 +26,16 @@ import 
org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for expand json plan. */
-public class ExpandJsonPlanITCase extends JsonPlanTestBase {
+class ExpandJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testExpand() throws Exception {
+    void testExpand() throws Exception {
         tableEnv.getConfig()
                 .set(
                         
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
index 345047861c8..367253eea76 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
@@ -27,29 +27,32 @@ import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.Wei
 import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for group aggregate json plan. */
-@RunWith(Parameterized.class)
-public class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
 
-    @Parameterized.Parameter public boolean isMiniBatchEnabled;
+    @Parameter private boolean isMiniBatchEnabled;
 
-    @Parameterized.Parameters(name = "isMiniBatchEnabled={0}")
-    public static List<Boolean> testData() {
+    @Parameters(name = "isMiniBatchEnabled={0}")
+    private static List<Boolean> testData() {
         return Arrays.asList(true, false);
     }
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         if (isMiniBatchEnabled) {
             tableEnv.getConfig()
@@ -63,8 +66,8 @@ public class GroupAggregateJsonPlanITCase extends 
JsonPlanTestBase {
         }
     }
 
-    @Test
-    public void testSimpleAggCallsWithGroupBy() throws Exception {
+    @TestTemplate
+    void testSimpleAggCallsWithGroupBy() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -90,8 +93,8 @@ public class GroupAggregateJsonPlanITCase extends 
JsonPlanTestBase {
         assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, 
Hello]"), result);
     }
 
-    @Test
-    public void testDistinctAggCalls() throws Exception {
+    @TestTemplate
+    void testDistinctAggCalls() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data2()),
@@ -130,8 +133,8 @@ public class GroupAggregateJsonPlanITCase extends 
JsonPlanTestBase {
                 result);
     }
 
-    @Test
-    public void testUserDefinedAggCallsWithoutMerge() throws Exception {
+    @TestTemplate
+    void testUserDefinedAggCallsWithoutMerge() throws Exception {
         tableEnv.createTemporaryFunction("my_sum1", new VarSum1AggFunction());
         tableEnv.createFunction("my_avg", WeightedAvg.class);
         tableEnv.createTemporarySystemFunction("my_sum2", 
VarSum2AggFunction.class);
@@ -166,8 +169,8 @@ public class GroupAggregateJsonPlanITCase extends 
JsonPlanTestBase {
                 Arrays.asList("+I[1, 77, 0, 1]", "+I[2, 120, 0, 2]", "+I[3, 
58, 0, 3]"), result);
     }
 
-    @Test
-    public void testUserDefinedAggCallsWithMerge() throws Exception {
+    @TestTemplate
+    void testUserDefinedAggCallsWithMerge() throws Exception {
         tableEnv.createFunction("my_avg", 
JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class);
         tableEnv.createTemporarySystemFunction(
                 "my_concat_agg", 
JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
index 71e99279cbd..f461640b3e0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
@@ -23,18 +23,19 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for group window aggregate json plan. */
-public class GroupWindowAggregateJsonITCase extends JsonPlanTestBase {
+class GroupWindowAggregateJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -59,7 +60,7 @@ public class GroupWindowAggregateJsonITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "name STRING",
@@ -93,7 +94,7 @@ public class GroupWindowAggregateJsonITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testEventTimeHopWindow() throws Exception {
+    void testEventTimeHopWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
@@ -121,7 +122,7 @@ public class GroupWindowAggregateJsonITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testEventTimeSessionWindow() throws Exception {
+    void testEventTimeSessionWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
index ea768397a51..8888bca4034 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
@@ -27,8 +27,8 @@ import 
org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -37,10 +37,11 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Test for incremental aggregate json plan. */
-public class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
+class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         tableEnv.getConfig()
                 .set(
@@ -56,8 +57,7 @@ public class IncrementalAggregateJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testIncrementalAggregate()
-            throws IOException, ExecutionException, InterruptedException {
+    void testIncrementalAggregate() throws IOException, ExecutionException, 
InterruptedException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
index def8c315563..94a1e41b2ac 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
@@ -22,17 +22,17 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for IntervalJoin json plan. */
-public class IntervalJoinJsonPlanITCase extends JsonPlanTestBase {
+class IntervalJoinJsonPlanITCase extends JsonPlanTestBase {
 
     /** test process time inner join. * */
     @Test
-    public void testProcessTimeInnerJoin() throws Exception {
+    void testProcessTimeInnerJoin() throws Exception {
         List<Row> rowT1 =
                 Arrays.asList(
                         Row.of(1, 1L, "Hi1"),
@@ -69,7 +69,7 @@ public class IntervalJoinJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testRowTimeInnerJoin() throws Exception {
+    void testRowTimeInnerJoin() throws Exception {
         List<Row> rowT1 =
                 Arrays.asList(
                         Row.of(1, 1L, "Hi1"),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
index 7fc86c5a6d1..a722226e044 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
@@ -23,19 +23,19 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for join json plan. */
-public class JoinJsonPlanITCase extends JsonPlanTestBase {
+class JoinJsonPlanITCase extends JsonPlanTestBase {
 
     @Override
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "A",
@@ -55,7 +55,7 @@ public class JoinJsonPlanITCase extends JsonPlanTestBase {
 
     /** test non-window inner join. * */
     @Test
-    public void testNonWindowInnerJoin() throws Exception {
+    void testNonWindowInnerJoin() throws Exception {
         List<String> dataT1 =
                 Arrays.asList(
                         "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", 
"1,9,Hi6", "1,8,Hi8",
@@ -88,7 +88,7 @@ public class JoinJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testIsNullInnerJoinWithNullCond() throws Exception {
+    void testIsNullInnerJoinWithNullCond() throws Exception {
         List<String> dataT1 =
                 Arrays.asList(
                         "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", 
"1,9,Hi6", "1,8,Hi8",
@@ -125,7 +125,7 @@ public class JoinJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    void testJoin() throws Exception {
         createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
         compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a3, b4 FROM 
A, B WHERE a2 = b2")
                 .await();
@@ -136,7 +136,7 @@ public class JoinJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testInnerJoin() throws Exception {
+    void testInnerJoin() throws Exception {
         createTestValuesSinkTable("MySink", "a1 int", "b1 int");
         compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a1, b1 FROM 
A JOIN B ON a1 = b1")
                 .await();
@@ -145,7 +145,7 @@ public class JoinJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testJoinWithFilter() throws Exception {
+    void testJoinWithFilter() throws Exception {
         createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
         compileSqlAndExecutePlan(
                         "insert into MySink \n"
@@ -156,7 +156,7 @@ public class JoinJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testInnerJoinWithDuplicateKey() throws Exception {
+    void testInnerJoinWithDuplicateKey() throws Exception {
         createTestValuesSinkTable("MySink", "a1 int", "b1 int", "b3 int");
         compileSqlAndExecutePlan(
                         "insert into MySink \n"
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
index 50b4f5d44f7..6507481d638 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -31,9 +31,9 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Test for limit JsonPlan ser/de. */
-public class LimitJsonPlanITCase extends JsonPlanTestBase {
+class LimitJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testLimit() throws ExecutionException, InterruptedException, 
IOException {
+    void testLimit() throws ExecutionException, InterruptedException, 
IOException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
index c24a6191c91..71e76e0e218 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
@@ -22,17 +22,19 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for LookupJoin json plan. */
-public class LookupJoinJsonPlanITCase extends JsonPlanTestBase {
+class LookupJoinJsonPlanITCase extends JsonPlanTestBase {
 
+    @BeforeEach
     @Override
-    public void setup() throws Exception {
+    protected void setup() throws Exception {
         super.setup();
         List<Row> rowT1 =
                 Arrays.asList(
@@ -66,7 +68,7 @@ public class LookupJoinJsonPlanITCase extends 
JsonPlanTestBase {
 
     /** test join temporal table. * */
     @Test
-    public void testJoinLookupTable() throws Exception {
+    void testJoinLookupTable() throws Exception {
         compileSqlAndExecutePlan(
                         "insert into MySink "
                                 + "SELECT T.id, T.len, T.content, D.name FROM 
src AS T JOIN user_table \n"
@@ -81,7 +83,7 @@ public class LookupJoinJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testJoinLookupTableWithPushDown() throws Exception {
+    void testJoinLookupTableWithPushDown() throws Exception {
         compileSqlAndExecutePlan(
                         "insert into MySink \n"
                                 + "SELECT T.id, T.len, T.content, D.name FROM 
src AS T JOIN user_table \n "
@@ -93,7 +95,7 @@ public class LookupJoinJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testLeftJoinLookupTableWithPreFilter() throws Exception {
+    void testLeftJoinLookupTableWithPreFilter() throws Exception {
         compileSqlAndExecutePlan(
                         "insert into MySink "
                                 + "SELECT T.id, T.len, T.content, D.name FROM 
src AS T LEFT JOIN user_table \n"
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
index 6f68c5c287a..ca64b2126df 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
@@ -22,16 +22,16 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 /** Test json deserialization for match recognize. */
-public class MatchRecognizeJsonPlanITCase extends JsonPlanTestBase {
+class MatchRecognizeJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testSimpleMatch() throws Exception {
+    void testSimpleMatch() throws Exception {
         List<Row> data =
                 Arrays.asList(
                         Row.of(1L, "a"),
@@ -70,7 +70,7 @@ public class MatchRecognizeJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testComplexMatch() throws Exception {
+    void testComplexMatch() throws Exception {
         List<Row> data =
                 Arrays.asList(
                         Row.of("ACME", 1L, 19, 1),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
index 0aaf882bc4c..55fc7a0a8d2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -34,10 +34,10 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Test json deserialization for over aggregate. */
-public class OverAggregateJsonPlanITCase extends JsonPlanTestBase {
+class OverAggregateJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testProcTimeBoundedPartitionedRowsOver()
+    void testProcTimeBoundedPartitionedRowsOver()
             throws ExecutionException, InterruptedException, IOException {
         createTestValuesSourceTable(
                 "MyTable",
@@ -79,7 +79,7 @@ public class OverAggregateJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testProcTimeUnboundedNonPartitionedRangeOver()
+    void testProcTimeUnboundedNonPartitionedRangeOver()
             throws IOException, ExecutionException, InterruptedException {
         List<Row> data =
                 Arrays.asList(
@@ -124,7 +124,7 @@ public class OverAggregateJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testRowTimeBoundedPartitionedRangeOver()
+    void testRowTimeBoundedPartitionedRangeOver()
             throws IOException, ExecutionException, InterruptedException {
         List<Row> data =
                 Arrays.asList(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
index 8397a4035e9..1b866271787 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -31,9 +31,9 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Test for Rank JsonPlan ser/de. */
-public class RankJsonPlanITCase extends JsonPlanTestBase {
+class RankJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testRank() throws ExecutionException, InterruptedException, 
IOException {
+    void testRank() throws ExecutionException, InterruptedException, 
IOException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
@@ -52,7 +52,7 @@ public class RankJsonPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testFirstN() throws ExecutionException, InterruptedException, 
IOException {
+    void testFirstN() throws ExecutionException, InterruptedException, 
IOException {
         createTestValuesSourceTable(
                 "MyTable1",
                 JavaScalaConversionUtil.toJava(TestData.data4()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
index 1320fa24920..bb40b5ab181 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
@@ -22,16 +22,16 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Test for Sarg JsonPlan ser/de. */
-public class SargJsonPlanITCase extends JsonPlanTestBase {
+class SargJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testSarg() throws ExecutionException, InterruptedException {
+    void testSarg() throws ExecutionException, InterruptedException {
         List<Row> data =
                 Arrays.asList(Row.of(1), Row.of(2), Row.of((Integer) null), 
Row.of(4), Row.of(5));
         createTestValuesSourceTable("MyTable", data, "a int");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
index bd1f168b4ab..7473057ffc4 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -31,9 +31,9 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Test for sort limit JsonPlan ser/de. */
-public class SortLimitJsonPlanITCase extends JsonPlanTestBase {
+class SortLimitJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testSortLimit() throws ExecutionException, 
InterruptedException, IOException {
+    void testSortLimit() throws ExecutionException, InterruptedException, 
IOException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
index 3a4a6cc8eee..73577d0775b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
@@ -21,8 +21,8 @@ package 
org.apache.flink.table.planner.runtime.stream.jsonplan;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -30,18 +30,19 @@ import java.util.HashMap;
 import java.util.List;
 
 /** Test for table sink json plan. */
-public class TableSinkJsonPlanITCase extends JsonPlanTestBase {
+class TableSinkJsonPlanITCase extends JsonPlanTestBase {
 
     List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int", "c 
varchar");
     }
 
     @Test
-    public void testPartitioning() throws Exception {
+    void testPartitioning() throws Exception {
         File sinkPath =
                 createTestCsvSinkTable(
                         "MySink",
@@ -55,7 +56,7 @@ public class TableSinkJsonPlanITCase extends JsonPlanTestBase 
{
     }
 
     @Test
-    public void testWritingMetadata() throws Exception {
+    void testWritingMetadata() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 new String[] {"a bigint", "b int", "c varchar METADATA"},
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
index 6a24ee079a5..b757407c0c7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -32,10 +32,10 @@ import java.util.List;
 import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
 
 /** Test for table source json plan. */
-public class TableSourceJsonPlanITCase extends JsonPlanTestBase {
+class TableSourceJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testProjectPushDown() throws Exception {
+    void testProjectPushDown() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not 
null", "c varchar");
         File sinkPath = createTestCsvSinkTable("MySink", "a bigint", "b int");
@@ -46,7 +46,7 @@ public class TableSourceJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testReadingMetadata() throws Exception {
+    void testReadingMetadata() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -65,7 +65,7 @@ public class TableSourceJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testReadingMetadataWithProjectionPushDownDisabled() throws 
Exception {
+    void testReadingMetadataWithProjectionPushDownDisabled() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -85,7 +85,7 @@ public class TableSourceJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testFilterPushDown() throws Exception {
+    void testFilterPushDown() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not 
null", "c varchar");
         File sinkPath = createTestCsvSinkTable("MySink", "a bigint", "b int", 
"c varchar");
@@ -96,7 +96,7 @@ public class TableSourceJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testPartitionPushDown() throws Exception {
+    void testPartitionPushDown() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -115,7 +115,7 @@ public class TableSourceJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testWatermarkPushDown() throws Exception {
+    void testWatermarkPushDown() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
@@ -146,7 +146,7 @@ public class TableSourceJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testPushDowns() throws Exception {
+    void testPushDowns() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
index ed57d92a422..56ebcf55947 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
@@ -23,7 +23,8 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
@@ -31,10 +32,11 @@ import java.util.List;
 import static org.apache.flink.table.api.Expressions.$;
 
 /** Test for TemporalJoin json plan. */
-public class TemporalJoinJsonPlanITCase extends JsonPlanTestBase {
+class TemporalJoinJsonPlanITCase extends JsonPlanTestBase {
 
+    @BeforeEach
     @Override
-    public void setup() throws Exception {
+    protected void setup() throws Exception {
         super.setup();
         List<Row> orders =
                 Arrays.asList(
@@ -78,7 +80,7 @@ public class TemporalJoinJsonPlanITCase extends 
JsonPlanTestBase {
 
     /** test process time inner join. * */
     @Test
-    public void testJoinTemporalFunction() throws Exception {
+    void testJoinTemporalFunction() throws Exception {
         compileSqlAndExecutePlan(
                         "INSERT INTO MySink "
                                 + "SELECT amount * r.rate "
@@ -91,7 +93,7 @@ public class TemporalJoinJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testTemporalTableJoin() throws Exception {
+    void testTemporalTableJoin() throws Exception {
         compileSqlAndExecutePlan(
                         "INSERT INTO MySink "
                                 + "SELECT amount * r.rate "
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
index 888e606fe85..a97a90bf003 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,10 +31,10 @@ import java.util.HashMap;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for temporal sort json plan. */
-public class TemporalSortJsonITCase extends JsonPlanTestBase {
+class TemporalSortJsonITCase extends JsonPlanTestBase {
 
     @Test
-    public void testSortProcessingTime() throws Exception {
+    void testSortProcessingTime() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -52,7 +52,7 @@ public class TemporalSortJsonITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testSortRowTime() throws Exception {
+    void testSortRowTime() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 
JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
index 8cc6ce34a87..1d7096a8518 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
@@ -23,15 +23,15 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test json serialization/deserialization for union. */
-public class UnionJsonPlanITCase extends JsonPlanTestBase {
+class UnionJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testUnion() throws Exception {
+    void testUnion() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
index eb967590459..e71546fbec6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
@@ -21,16 +21,16 @@ package 
org.apache.flink.table.planner.runtime.stream.jsonplan;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for values json plan. */
-public class ValuesJsonPlanITCase extends JsonPlanTestBase {
+class ValuesJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testValues() throws Exception {
+    void testValues() throws Exception {
         createTestValuesSinkTable("MySink", "b INT", "a INT", "c VARCHAR");
         compileSqlAndExecutePlan(
                         "INSERT INTO MySink SELECT * from (VALUES (1, 2, 
'Hi'), (3, 4, 'Hello'))")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
index 4ac0a181860..bfc32e880a5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -31,10 +31,10 @@ import java.util.HashMap;
 import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
 
 /** Test for watermark assigner json plan. */
-public class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase {
+class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testWatermarkAssigner() throws Exception {
+    void testWatermarkAssigner() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
@@ -65,7 +65,7 @@ public class WatermarkAssignerJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testWatermarkPushDownWithMetadata() throws Exception {
+    void testWatermarkPushDownWithMetadata() throws Exception {
         // to verify FLINK-30598: the case declares metadata field first, 
without the fix it'll get
         // wrong code generated by WatermarkGeneratorCodeGenerator which 
reference the incorrect
         // varchar column as the watermark field.
@@ -102,7 +102,7 @@ public class WatermarkAssignerJsonPlanITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testWatermarkAndProjectPushDownWithMetadata() throws Exception 
{
+    void testWatermarkAndProjectPushDownWithMetadata() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
index 6e7e8487dfa..dc07965637c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
@@ -24,32 +24,35 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window aggregate json plan. */
-@RunWith(Parameterized.class)
-public class WindowAggregateJsonITCase extends JsonPlanTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+class WindowAggregateJsonITCase extends JsonPlanTestBase {
 
-    @Parameterized.Parameters(name = "agg_phase = {0}")
-    public static Object[] parameters() {
+    @Parameters(name = "agg_phase = {0}")
+    private static Object[] parameters() {
         return new Object[][] {
             new Object[] {AggregatePhaseStrategy.ONE_PHASE},
             new Object[] {AggregatePhaseStrategy.TWO_PHASE}
         };
     }
 
-    @Parameterized.Parameter public AggregatePhaseStrategy aggPhase;
+    @Parameter private AggregatePhaseStrategy aggPhase;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -77,8 +80,8 @@ public class WindowAggregateJsonITCase extends 
JsonPlanTestBase {
                         aggPhase.toString());
     }
 
-    @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    @TestTemplate
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "name STRING",
@@ -112,8 +115,8 @@ public class WindowAggregateJsonITCase extends 
JsonPlanTestBase {
                 result);
     }
 
-    @Test
-    public void testEventTimeHopWindow() throws Exception {
+    @TestTemplate
+    void testEventTimeHopWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
@@ -141,8 +144,8 @@ public class WindowAggregateJsonITCase extends 
JsonPlanTestBase {
                 result);
     }
 
-    @Test
-    public void testEventTimeCumulateWindow() throws Exception {
+    @TestTemplate
+    void testEventTimeCumulateWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
@@ -177,8 +180,8 @@ public class WindowAggregateJsonITCase extends 
JsonPlanTestBase {
                 result);
     }
 
-    @Test
-    public void testDistinctSplitEnabled() throws Exception {
+    @TestTemplate
+    void testDistinctSplitEnabled() throws Exception {
         tableEnv.getConfig()
                 
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         createTestValuesSinkTable(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
index a377dc44678..75eb714c9df 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
@@ -23,18 +23,19 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window deduplicate json plan. */
-public class WindowDeduplicateJsonITCase extends JsonPlanTestBase {
+class WindowDeduplicateJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -59,7 +60,7 @@ public class WindowDeduplicateJsonITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "ts STRING",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
index 7e7565c7e23..c4e84d96b32 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
@@ -23,18 +23,19 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window join json plan. */
-public class WindowJoinJsonITCase extends JsonPlanTestBase {
+class WindowJoinJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -80,7 +81,7 @@ public class WindowJoinJsonITCase extends JsonPlanTestBase {
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "name STRING",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
index e5489b8307b..1ccf0e03da1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
@@ -23,18 +23,19 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window deduplicate json plan. */
-public class WindowTableFunctionJsonITCase extends JsonPlanTestBase {
+class WindowTableFunctionJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -59,7 +60,7 @@ public class WindowTableFunctionJsonITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "ts STRING",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
index 961dc62482c..6d6dbcf4386 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.utils;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -27,18 +28,22 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,17 +58,27 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The base class for json plan testing. */
-public abstract class JsonPlanTestBase extends AbstractTestBase {
+public abstract class JsonPlanTestBase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(4)
+                            .build());
+
+    @TempDir protected Path tempFolder;
 
     protected TableEnvironment tableEnv;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    protected void setup() throws Exception {
         tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
     }
 
-    @After
-    public void after() {
+    @AfterEach
+    protected void after() {
         TestValuesTableFactory.clearAllData();
     }
 
@@ -216,7 +231,7 @@ public abstract class JsonPlanTestBase extends 
AbstractTestBase {
     protected void createTestCsvSourceTable(
             String tableName, List<String> data, String... fieldNameAndTypes) 
throws IOException {
         checkArgument(fieldNameAndTypes.length > 0);
-        File sourceFile = TEMPORARY_FOLDER.newFile();
+        File sourceFile = TempDirUtils.newFile(tempFolder);
         Collections.shuffle(data);
         Files.write(sourceFile.toPath(), String.join("\n", data).getBytes());
         String ddl =
@@ -246,7 +261,7 @@ public abstract class JsonPlanTestBase extends 
AbstractTestBase {
                 StringUtils.isNullOrWhitespaceOnly(partitionFields)
                         ? ""
                         : "\n partitioned by (" + partitionFields + ") \n";
-        File sinkPath = TEMPORARY_FOLDER.newFolder();
+        File sinkPath = TempDirUtils.newFolder(tempFolder);
         String ddl =
                 String.format(
                         "CREATE TABLE %s (\n"

Reply via email to