This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70ac66d567e483b084d528a60f5153aa38a19dbf
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri May 12 16:03:07 2023 +0800

    [FLINK-32058][tests] Migrate the subclasses of BatchAbstractTestBase in 
runtime.batch.sql to JUnit5
---
 .../planner/runtime/batch/sql/CalcITCase.scala     |  76 +++++-----
 .../runtime/batch/sql/CodeSplitITCase.scala        |  23 +--
 .../runtime/batch/sql/CorrelateITCase.scala        |   6 +-
 .../runtime/batch/sql/CorrelateITCase2.scala       |   6 +-
 .../runtime/batch/sql/LegacyLimitITCase.scala      |   5 +-
 .../batch/sql/LegacyTableSourceITCase.scala        |  10 +-
 .../runtime/batch/sql/Limit0RemoveITCase.scala     |   6 +-
 .../planner/runtime/batch/sql/LimitITCase.scala    |   4 +
 .../planner/runtime/batch/sql/MiscITCase.scala     | 157 +++++++++++----------
 .../runtime/batch/sql/MultipleInputITCase.scala    |  35 ++---
 .../runtime/batch/sql/OverAggregateITCase.scala    |   7 +-
 .../batch/sql/PartitionableSinkITCase.scala        | 103 +++++++-------
 .../batch/sql/PartitionableSourceITCase.scala      |  33 +++--
 .../planner/runtime/batch/sql/RankITCase.scala     |   6 +-
 .../runtime/batch/sql/SetOperatorsITCase.scala     |  40 +++---
 .../runtime/batch/sql/SortLimitITCase.scala        |   4 +-
 .../runtime/batch/sql/TableScanITCase.scala        |   2 +-
 .../runtime/batch/sql/TableSinkITCase.scala        |  50 +++----
 .../runtime/batch/sql/TableSourceITCase.scala      |  14 +-
 .../planner/runtime/batch/sql/UnionITCase.scala    |   6 +-
 .../planner/runtime/batch/sql/UnnestITCase.scala   |   3 +-
 .../planner/runtime/batch/sql/ValuesITCase.scala   |   2 +-
 .../batch/sql/WindowTableFunctionITCase.scala      |   4 +-
 23 files changed, 301 insertions(+), 301 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 983cefc80c8..a0849349ace 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -44,9 +44,8 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime
 import org.apache.flink.types.Row
 
-import org.junit._
-import org.junit.Assert.assertEquals
-import org.junit.rules.ExpectedException
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.junit.jupiter.api.{BeforeEach, Disabled, Test}
 
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Time, Timestamp}
@@ -55,12 +54,7 @@ import java.util
 
 class CalcITCase extends BatchTestBase {
 
-  var _expectedEx: ExpectedException = ExpectedException.none
-
-  @Rule
-  def expectedEx: ExpectedException = _expectedEx
-
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
@@ -398,9 +392,10 @@ class CalcITCase extends BatchTestBase {
     checkResult("SELECT `1-_./Ü`, b, c FROM (SELECT a as `1-_./Ü`, b, c FROM 
Table3)", data3)
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidFields(): Unit = {
-    checkResult("SELECT a, foo FROM Table3", data3)
+    assertThatThrownBy(() => checkResult("SELECT a, foo FROM Table3", data3))
+      .isInstanceOf(classOf[ValidationException])
   }
 
   @Test
@@ -1032,12 +1027,11 @@ class CalcITCase extends BatchTestBase {
 
   @Test
   def testMapTypeGroupBy(): Unit = {
-    _expectedEx.expectMessage(
-      "Type(MAP<INT NOT NULL, VARCHAR(5) NOT NULL> NOT NULL) is not an 
orderable data type, it is not supported as a ORDER_BY/GROUP_BY/JOIN_EQUAL 
field.")
-    checkResult(
-      "SELECT COUNT(*) FROM SmallTable3 GROUP BY MAP[1, 'Hello', 2, 'Hi']",
-      Seq()
-    )
+    assertThatThrownBy(
+      () =>
+        checkResult("SELECT COUNT(*) FROM SmallTable3 GROUP BY MAP[1, 'Hello', 
2, 'Hi']", Seq()))
+      .hasMessage(
+        "Type(MAP<INT NOT NULL, VARCHAR(5) NOT NULL> NOT NULL) is not an 
orderable data type, it is not supported as a ORDER_BY/GROUP_BY/JOIN_EQUAL 
field.")
   }
 
   @Test
@@ -1058,16 +1052,17 @@ class CalcITCase extends BatchTestBase {
     val result = executeQuery(table)
 
     val nestedRow = result.head.getField(0).asInstanceOf[Row]
-    assertEquals(data.head.getField(0), nestedRow.getField(0))
-    assertEquals(data.head.getField(1), nestedRow.getField(1))
-    assertEquals(data.head.getField(2), nestedRow.getField(2))
+    assertThat(data.head.getField(0)).isEqualTo(nestedRow.getField(0))
+    assertThat(data.head.getField(1)).isEqualTo(nestedRow.getField(1))
+    assertThat(data.head.getField(2)).isEqualTo(nestedRow.getField(2))
 
     val arr = result.head.getField(1).asInstanceOf[Array[Integer]]
-    assertEquals(12, arr(0))
-    assertEquals(data.head.getField(1), arr(1))
+    assertThat(12).isEqualTo(arr(0))
+    assertThat(data.head.getField(1)).isEqualTo(arr(1))
 
     val hashMap = result.head.getField(2).asInstanceOf[util.HashMap[String, 
Timestamp]]
-    assertEquals(data.head.getField(2), 
hashMap.get(data.head.getField(0).asInstanceOf[String]))
+    assertThat(data.head.getField(2))
+      .isEqualTo(hashMap.get(data.head.getField(0).asInstanceOf[String]))
   }
 
   @Test
@@ -1144,7 +1139,7 @@ class CalcITCase extends BatchTestBase {
     )
   }
 
-  @Ignore // TODO support Unicode
+  @Disabled // TODO support Unicode
   @Test
   def testFunctionWithUnicodeParameters(): Unit = {
     val data = List(
@@ -1326,7 +1321,7 @@ class CalcITCase extends BatchTestBase {
     val d1 =
       
LocalDateConverter.INSTANCE.toInternal(result.toList.head.getField(0).asInstanceOf[LocalDate])
 
-    Assert.assertTrue(d0 <= d1 && d1 - d0 <= 1)
+    assertThat(d0 <= d1 && d1 - d0 <= 1).isTrue
   }
 
   @Test
@@ -1346,7 +1341,7 @@ class CalcITCase extends BatchTestBase {
 
     val ts2 = System.currentTimeMillis()
 
-    Assert.assertTrue(ts0 <= ts1 && ts1 <= ts2)
+    assertThat(ts0 <= ts1 && ts1 <= ts2).isTrue
   }
 
   @Test
@@ -1534,7 +1529,7 @@ class CalcITCase extends BatchTestBase {
     )
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test
   def testOrderByBinary(): Unit = {
     registerCollection(
       "BinaryT",
@@ -1550,18 +1545,21 @@ class CalcITCase extends BatchTestBase {
     )
     
tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
Int.box(1))
     tableConfig.set(BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, 
Boolean.box(true))
-    checkResult(
-      "select * from BinaryT order by c",
-      nullData3
-        .sortBy((x: Row) => x.getField(2).asInstanceOf[String])
-        .map(
-          r =>
-            row(
-              r.getField(0),
-              r.getField(1),
-              r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
-      isSorted = true
-    )
+
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "select * from BinaryT order by c",
+          nullData3
+            .sortBy((x: Row) => x.getField(2).asInstanceOf[String])
+            .map(
+              r =>
+                row(
+                  r.getField(0),
+                  r.getField(1),
+                  r.getField(2).toString.getBytes(StandardCharsets.UTF_8))),
+          isSorted = true
+        )).isInstanceOf(classOf[UnsupportedOperationException])
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
index 077978651f5..74384b4cf0d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
@@ -24,13 +24,13 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import 
org.apache.flink.table.planner.runtime.utils.TestData.{nullablesOfData3, 
smallData3, type3}
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
-
-import scala.collection.Seq
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.IterableAssert.assertThatIterable
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class CodeSplitITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("SmallTable3", smallData3, type3, "a, b, c", 
nullablesOfData3)
@@ -117,7 +117,8 @@ class CodeSplitITCase extends BatchTestBase {
     for (i <- 0 until 100) {
       expected.add(s"+I[${Range(0, 100).map(_ => s"$i").mkString(", ")}]")
     }
-    Assert.assertEquals(expected, 
TestValuesTableFactory.getResults("test_many_values"))
+    assertThatIterable(TestValuesTableFactory.getResults("test_many_values"))
+      .containsExactlyElementsOf(expected)
   }
 
   /**
@@ -164,17 +165,17 @@ class CodeSplitITCase extends BatchTestBase {
     val result = executeQuery(table)
 
     // The result table should contain 250 columns from a_1 to a_250 and 10 
rows with values from 1 to 10.
-    Assert.assertEquals(10, result.size)
+    assertThat(result.size).isEqualTo(10)
     for (rowNumber <- result.indices) {
       val row = result(rowNumber)
-      Assert.assertEquals(250, row.getArity)
+      assertThat(row.getArity).isEqualTo(250)
       for (j <- 1 to 250) {
         // column value starts from 1 and ends at 10.
         val expectedRowValue = rowNumber + 1
-        Assert.assertEquals(
-          "Invalid value for row %d and column a_%d.".format(rowNumber, j),
-          expectedRowValue,
-          row.getField("a_" + j))
+        assertThat(row.getField("a_" + j))
+          .withFailMessage("Invalid value for row %d and column 
a_%d.".format(rowNumber, j))
+          .isEqualTo(expectedRowValue)
+
       }
     }
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala
index fcad3d9d6b9..b3ddc833b75 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala
@@ -31,18 +31,18 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import 
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.{MyPojo,
 MyPojoFunc}
-import org.apache.flink.table.planner.utils.{HierarchyTableFunction, 
PojoTableFunc, RichTableFunc1, RichTableFuncWithFinish, TableFunc0, TableFunc1, 
TableFunc2, TableFunc3, VarArgsFunc0}
+import org.apache.flink.table.planner.utils._
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.table.runtime.typeutils.StringDataTypeInfo
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Test}
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 import scala.collection.JavaConversions._
 
 class CorrelateITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("inputT", TableFunctionITCase.testData, type3, "a, b, 
c")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala
index 892150d463d..4b0676c8d8b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala
@@ -24,13 +24,11 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase._
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit
 import org.apache.flink.table.planner.runtime.utils.TestData._
 
-import org.junit.{Before, Test}
-
-import scala.collection.Seq
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class CorrelateITCase2 extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("inputT", TableFunctionITCase.testData, type3, "a, b, 
c")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
index 11aafa004a2..054022fd590 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala
@@ -18,16 +18,15 @@
 package org.apache.flink.table.planner.runtime.batch.sql
 
 import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.planner.utils.TestLegacyLimitableTableSource
 
-import org.junit._
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class LegacyLimitITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala
index 4ef34d8fdce..0ce2146be49 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala
@@ -20,16 +20,16 @@ package org.apache.flink.table.planner.runtime.batch.sql
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
-import org.apache.flink.table.api.config.TableConfigOptions
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
 import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.{createFileInTempFolder,
 createTempFolder}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
-import org.apache.flink.table.planner.utils.{TableTestUtil, 
TestDataTypeTableSource, TestFileInputFormatTableSource, 
TestInputFormatTableSource, TestLegacyFilterableTableSource, 
TestLegacyProjectableTableSource, TestNestedProjectableTableSource, 
TestPartitionableSourceFactory, TestTableSourceSinks}
+import org.apache.flink.table.planner.utils._
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before, Test}
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 import java.io.FileWriter
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
@@ -40,7 +40,7 @@ import scala.collection.mutable
 
 class LegacyTableSourceITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     env.setParallelism(1) // set sink parallelism to 1
@@ -373,6 +373,6 @@ class LegacyTableSourceITCase extends BatchTestBase {
 
     val result = TableTestUtil.readFromFile(resultPath)
     val expected = Seq("31,31,31.0", "32,32,32.0", "32,32,32.0")
-    Assert.assertEquals(expected.sorted, result.sorted)
+    assertThat(expected.sorted).isEqualTo(result.sorted)
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala
index 68c5d5a468f..0ed5587a13e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala
@@ -21,15 +21,13 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData.numericType
 
-import org.junit.{Before, Test}
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 import java.math.{BigDecimal => JBigDecimal}
 
-import scala.collection.Seq
-
 class Limit0RemoveITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     lazy val numericData = Seq(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
index edf0a0b0b4e..7dcd5de370a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
@@ -21,7 +21,11 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
 import org.apache.flink.table.planner.runtime.utils.TestData.{data3, 
nullablesOfData3, type3}
 
+import org.junit.jupiter.api.BeforeEach
+
 class LimitITCase extends LegacyLimitITCase {
+
+  @BeforeEach
   override def before(): Unit = {
     BatchTestBase.configForMiniCluster(tableConfig)
     registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
index 648a61c08a5..fd5a5a4343e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.sql
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM
 import org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCaseHelper
 import 
org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.SortMergeJoin
@@ -28,9 +29,8 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData.{buildInData, 
buildInType}
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Test}
-
-import scala.collection.Seq
+import org.assertj.core.api.Assertions.assertThatThrownBy
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 /** Misc tests. */
 class MiscITCase extends BatchTestBase {
@@ -39,7 +39,7 @@ class MiscITCase extends BatchTestBase {
 
   private var newTableId = 0
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("testTable", buildInData, buildInType, 
"a,b,c,d,e,f,g,h,i,j")
@@ -528,82 +528,97 @@ class MiscITCase extends BatchTestBase {
       Seq(row(false, 1), row(true, 3)))
   }
 
-  @Test(expected = classOf[org.apache.flink.table.api.ValidationException])
+  @Test
   def testTableGenerateFunction(): Unit = {
-    checkResult(
-      "SELECT f, g, v FROM testTable," +
-        "LATERAL TABLE(STRING_SPLIT(f, ' ')) AS T(v)",
-      Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", 
null, "fg"))
-    )
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "SELECT f, g, v FROM testTable," +
+            "LATERAL TABLE(STRING_SPLIT(f, ' ')) AS T(v)",
+          Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", 
null, "fg"))
+        )).isInstanceOf(classOf[ValidationException])
 
     // BuildInFunctions in SQL is case insensitive
-    checkResult(
-      "SELECT f, g, v FROM testTable," +
-        "LATERAL TABLE(sTRING_sPLIT(f, ' ')) AS T(v)",
-      Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", 
null, "fg"))
-    )
-
-    checkResult(
-      "SELECT f, g, v FROM testTable," +
-        "LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v)",
-      Seq(
-        row("abcd", "f%g", 0),
-        row(null, "hij_k", 0),
-        row(null, "hij_k", 1),
-        row("e fg", null, 0),
-        row("e fg", null, 1),
-        row("e fg", null, 2))
-    )
-
-    checkResult(
-      "SELECT f, g, v FROM testTable," +
-        "LATERAL TABLE(JSON_TUPLE('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": 
\"b3\"}'," +
-        "'a1', f)) AS T(v)",
-      Seq(
-        row("abcd", "f%g", "b1"),
-        row("abcd", "f%g", null),
-        row(null, "hij_k", "b1"),
-        row(null, "hij_k", null),
-        row("e fg", null, "b1"),
-        row("e fg", null, "b3"))
-    )
-
-    checkResult(
-      "SELECT f, g, v FROM " +
-        "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
-        "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS 
T(v) " +
-        "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = 
CHAR_LENGTH(v) + 3",
-      Seq(
-        row("abcd", "f%g", "b1"),
-        row(null, "hij_k", "b1"),
-        row("e fg", null, "b1"),
-        row("e fg", null, "b3"))
-    )
-
-    checkResult(
-      "SELECT f, g, v FROM " +
-        "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
-        "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS 
T(v) " +
-        "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = 
CHAR_LENGTH(v) + 3",
-      Seq(
-        row("abcd", "f%g", "b1"),
-        row(null, "hij_k", "b1"),
-        row("e fg", null, "b1"),
-        row("e fg", null, "b3"))
-    )
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "SELECT f, g, v FROM testTable," +
+            "LATERAL TABLE(sTRING_sPLIT(f, ' ')) AS T(v)",
+          Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", 
null, "fg"))
+        )).isInstanceOf(classOf[ValidationException])
+
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "SELECT f, g, v FROM testTable," +
+            "LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v)",
+          Seq(
+            row("abcd", "f%g", 0),
+            row(null, "hij_k", 0),
+            row(null, "hij_k", 1),
+            row("e fg", null, 0),
+            row("e fg", null, 1),
+            row("e fg", null, 2))
+        )).isInstanceOf(classOf[ValidationException])
+
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "SELECT f, g, v FROM testTable," +
+            "LATERAL TABLE(JSON_TUPLE('{\"a1\": \"b1\", \"a2\": \"b2\", \"e 
fg\": \"b3\"}'," +
+            "'a1', f)) AS T(v)",
+          Seq(
+            row("abcd", "f%g", "b1"),
+            row("abcd", "f%g", null),
+            row(null, "hij_k", "b1"),
+            row(null, "hij_k", null),
+            row("e fg", null, "b1"),
+            row("e fg", null, "b3"))
+        )).isInstanceOf(classOf[ValidationException])
+
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "SELECT f, g, v FROM " +
+            "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
+            "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) 
AS T(v) " +
+            "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = 
CHAR_LENGTH(v) + 3",
+          Seq(
+            row("abcd", "f%g", "b1"),
+            row(null, "hij_k", "b1"),
+            row("e fg", null, "b1"),
+            row("e fg", null, "b3"))
+        )).isInstanceOf(classOf[ValidationException])
+
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "SELECT f, g, v FROM " +
+            "testTable JOIN LATERAL TABLE(JSON_TUPLE" +
+            "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) 
AS T(v) " +
+            "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = 
CHAR_LENGTH(v) + 3",
+          Seq(
+            row("abcd", "f%g", "b1"),
+            row(null, "hij_k", "b1"),
+            row("e fg", null, "b1"),
+            row("e fg", null, "b3"))
+        )).isInstanceOf(classOf[ValidationException])
   }
 
   /**
    * Due to the improper translation of TableFunction left outer join (see 
CALCITE-2004), the join
    * predicate can only be empty or literal true (the restriction should be 
removed in FLINK-7865).
    */
-  @Test(expected = classOf[org.apache.flink.table.api.ValidationException])
+  @Test
   def testTableGenerateFunctionLeftJoin(): Unit = {
-    checkResult(
-      "SELECT f, g, v FROM " +
-        "testTable LEFT OUTER JOIN LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS 
INTEGER))) AS T(v) " +
-        "ON LENGTH(f) = v + 2 OR LENGTH(g) = v + 4",
-      Seq(row(null, "hij_k", 1), row("e fg", null, 2))
-    )
+    assertThatThrownBy(
+      () =>
+        checkResult(
+          "SELECT f, g, v FROM " +
+            "testTable LEFT OUTER JOIN LATERAL TABLE(GENERATE_SERIES(0, CAST(b 
AS INTEGER))) AS T(v) " +
+            "ON LENGTH(f) = v + 2 OR LENGTH(g) = v + 4",
+          Seq(row(null, "hij_k", 1), row("e fg", null, 2))
+        )).isInstanceOf(classOf[ValidationException])
+
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
index f25d2d96ec9..e68b6b63010 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala
@@ -25,12 +25,11 @@ import 
org.apache.flink.configuration.JobManagerOptions.SchedulerType
 import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, 
ParameterizedTestExtension, Parameters}
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Test}
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
+import org.junit.jupiter.api.{BeforeEach, TestTemplate}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.collection.JavaConversions._
 import scala.util.Random
@@ -42,11 +41,15 @@ import scala.util.Random
  * IT cases are picked from
  * [[org.apache.flink.table.planner.plan.batch.sql.MultipleInputCreationTest]].
  */
-@RunWith(classOf[Parameterized])
-class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: 
SchedulerType)
-  extends BatchTestBase {
+@ExtendWith(Array(classOf[ParameterizedTestExtension]))
+class MultipleInputITCase extends BatchTestBase {
 
-  @Before
+  @Parameter var shuffleMode: BatchShuffleMode = _
+
+  @Parameter(value = 1)
+  var schedulerType: SchedulerType = _
+
+  @BeforeEach
   override def before(): Unit = {
     super.before()
 
@@ -79,7 +82,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
     tEnv.getConfig.set(JobManagerOptions.SCHEDULER, schedulerType)
   }
 
-  @Test
+  @TestTemplate
   def testBasicMultipleInput(): Unit = {
     checkMultipleInputResult("""
                                |SELECT * FROM
@@ -90,7 +93,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
                                |""".stripMargin)
   }
 
-  @Test
+  @TestTemplate
   def testManyMultipleInputs(): Unit = {
     checkMultipleInputResult(
       """
@@ -116,7 +119,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
         |""".stripMargin)
   }
 
-  @Test
+  @TestTemplate
   def testJoinWithAggAsProbe(): Unit = {
     checkMultipleInputResult(
       """
@@ -130,7 +133,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
     )
   }
 
-  @Test
+  @TestTemplate
   def testNoPriorityConstraint(): Unit = {
     checkMultipleInputResult(
       """
@@ -141,7 +144,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
     )
   }
 
-  @Test
+  @TestTemplate
   def testRelatedInputs(): Unit = {
     checkMultipleInputResult(
       """
@@ -157,7 +160,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
     )
   }
 
-  @Test
+  @TestTemplate
   def testRelatedInputsWithAgg(): Unit = {
     checkMultipleInputResult(
       """
@@ -173,7 +176,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
     )
   }
 
-  @Test
+  @TestTemplate
   def testDeadlockCausedByExchangeInAncestor(): Unit = {
     checkMultipleInputResult(
       """
@@ -185,7 +188,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, 
schedulerType: Schedule
     )
   }
 
-  @Test
+  @TestTemplate
   def testMaxSupportedInputs(): Unit = {
     val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO)
     val data = Seq(BatchTestBase.row(1, "test"))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
index 979fa83e8e3..250ba36a6f8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, 
FLOAT_TYPE_INFO, INT_TYPE_INFO, SHORT_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
@@ -32,17 +32,16 @@ import 
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Test}
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 import java.lang.{Iterable => JIterable, Long => JLong}
 import java.util.{Collections, Optional}
 
-import scala.collection.Seq
 import scala.util.Random
 
 class OverAggregateITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("Table5", data5, type5, "d, e, f, g, h", 
nullablesOfData5)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index a16b7c6d540..8e4d4ce5b72 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.{BatchExecutionOptions, Configuration}
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions
+import org.apache.flink.core.testutils.EachCallbackWrapper
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.{Schema, TableEnvironment, TableException, 
TableSchema, ValidationException}
@@ -31,39 +32,33 @@ import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
 import org.apache.flink.table.descriptors.DescriptorProperties
 import org.apache.flink.table.descriptors.Schema.SCHEMA
 import org.apache.flink.table.factories.TableSinkFactory
-import 
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
 type_int_string, _}
+import 
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
 _}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, 
TableSink}
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.table.utils.LegacyRowResource
+import org.apache.flink.table.utils.LegacyRowExtension
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Rule, Test}
-import org.junit.Assert._
-import org.junit.rules.ExpectedException
+import org.assertj.core.api.Assertions.{assertThat, assertThatIterable, 
assertThatThrownBy}
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.extension.RegisterExtension
 
 import java.util
 import java.util.{function, ArrayList => JArrayList, LinkedList => 
JLinkedList, List => JList, Map => JMap}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
-import scala.collection.Seq
 
 /** Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]]. */
 class PartitionableSinkITCase extends BatchTestBase {
 
-  private val _expectedException = ExpectedException.none
+  @RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] =
+    new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension)
 
-  @Rule
-  def expectedEx: ExpectedException = _expectedException
-
-  @Rule
-  def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
-
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     env.setParallelism(3)
@@ -83,9 +78,10 @@ class PartitionableSinkITCase extends BatchTestBase {
         "insert into sinkTable select a, max(b), c"
           + " from nonSortTable group by a, c")
       .await()
-    assertEquals(List("1,5,Hi", "1,5,Hi01", "1,5,Hi02"), RESULT1.sorted)
-    assert(RESULT2.isEmpty)
-    assertEquals(
+    assertThatIterable(RESULT1).containsExactlyInAnyOrderElementsOf(
+      List("1,5,Hi", "1,5,Hi01", "1,5,Hi02"))
+    assertThat(RESULT2.isEmpty()).isTrue
+    assertThatIterable(RESULT3).containsExactlyInAnyOrderElementsOf(
       List(
         "2,1,Hello world01",
         "2,1,Hello world02",
@@ -98,34 +94,33 @@ class PartitionableSinkITCase extends BatchTestBase {
         "3,2,Hello02",
         "3,2,Hello03",
         "3,2,Hello04"
-      ),
-      RESULT3.sorted
-    )
+      ))
   }
 
   @Test
   def testInsertWithPartitionGrouping(): Unit = {
     registerTableSink()
     tEnv.executeSql("insert into sinkTable select a, b, c from 
sortTable").await()
-    assertEquals(List("1,1,Hello world", "1,1,Hello world, how are you?"), 
RESULT1.toList)
-    assertEquals(
-      List("4,4,你好,陌生人", "4,4,你好,陌生人,我是", "4,4,你好,陌生人,我是中国人", 
"4,4,你好,陌生人,我是中国人,你来自哪里?"),
-      RESULT2.toList)
-    assertEquals(
+    assertThatIterable(RESULT1).containsExactlyElementsOf(
+      List("1,1,Hello world", "1,1,Hello world, how are you?"))
+    assertThatIterable(RESULT2).containsExactlyElementsOf(
+      List("4,4,你好,陌生人", "4,4,你好,陌生人,我是", "4,4,你好,陌生人,我是中国人", 
"4,4,你好,陌生人,我是中国人,你来自哪里?"))
+    assertThatIterable(RESULT3).containsExactlyElementsOf(
       List(
         "2,2,Hi",
         "2,2,Hello",
         "3,3,I'm fine, thank",
         "3,3,I'm fine, thank you",
-        "3,3,I'm fine, thank you, and you?"),
-      RESULT3.toList)
+        "3,3,I'm fine, thank you, and you?"))
   }
 
   @Test
   def testInsertWithStaticPartitions(): Unit = {
     registerTableSink()
     tEnv.executeSql("insert into sinkTable partition(a=1) select b, c from 
sortTable").await()
-    assertEquals(
+    assertThatIterable(
+      RESULT1
+    ).containsExactlyElementsOf(
       List(
         "1,2,Hi",
         "1,1,Hello world",
@@ -138,51 +133,47 @@ class PartitionableSinkITCase extends BatchTestBase {
         "1,4,你好,陌生人,我是",
         "1,4,你好,陌生人,我是中国人",
         "1,4,你好,陌生人,我是中国人,你来自哪里?"
-      ),
-      RESULT1.toList
-    )
-    assert(RESULT2.isEmpty)
-    assert(RESULT3.isEmpty)
+      ))
+    assertThat(RESULT2.isEmpty).isTrue
+    assertThat(RESULT3.isEmpty).isTrue
   }
 
   @Test
   def testInsertWithStaticAndDynamicPartitions(): Unit = {
     registerTableSink(partitionColumns = Array("a", "b"))
     tEnv.executeSql("insert into sinkTable partition(a=1) select b, c from 
sortTable").await()
-    assertEquals(List("1,1,Hello world", "1,1,Hello world, how are you?"), 
RESULT1.toList)
-    assertEquals(
-      List("1,4,你好,陌生人", "1,4,你好,陌生人,我是", "1,4,你好,陌生人,我是中国人", 
"1,4,你好,陌生人,我是中国人,你来自哪里?"),
-      RESULT2.toList)
-    assertEquals(
+    assertThatIterable(RESULT1).containsExactlyElementsOf(
+      List("1,1,Hello world", "1,1,Hello world, how are you?"))
+    assertThatIterable(RESULT2).containsExactlyElementsOf(
+      List("1,4,你好,陌生人", "1,4,你好,陌生人,我是", "1,4,你好,陌生人,我是中国人", 
"1,4,你好,陌生人,我是中国人,你来自哪里?"))
+    assertThatIterable(RESULT3).containsExactlyElementsOf(
       List(
         "1,2,Hi",
         "1,2,Hello",
         "1,3,I'm fine, thank",
         "1,3,I'm fine, thank you",
-        "1,3,I'm fine, thank you, and you?"),
-      RESULT3.toList)
+        "1,3,I'm fine, thank you, and you?"))
   }
 
   @Test
   def testInsertWithStaticPartitionAndStarSource(): Unit = {
     registerTableSink(partitionColumns = Array("b", "c"))
     tEnv.executeSql("insert into sinkTable partition(b=1) select * from 
starTable").await()
-    assertEquals(
+    assertThatIterable(RESULT1).containsExactlyElementsOf(
       List(
         "1,1,Hello world, how are you?",
         "3,1,I'm fine, thank you",
         "4,1,你好,陌生人",
-        "4,1,你好,陌生人,我是中国人"),
-      RESULT1.toList)
-    assertEquals(List("4,1,你好,陌生人,我是", "4,1,你好,陌生人,我是中国人,你来自哪里?"), 
RESULT2.toList)
-    assertEquals(
+        "4,1,你好,陌生人,我是中国人"))
+    assertThatIterable(RESULT2).containsExactlyElementsOf(
+      List("4,1,你好,陌生人,我是", "4,1,你好,陌生人,我是中国人,你来自哪里?"))
+    assertThatIterable(RESULT3).containsExactlyElementsOf(
       List(
         "2,1,Hello",
         "1,1,Hello world",
         "2,1,Hi",
         "3,1,I'm fine, thank",
-        "3,1,I'm fine, thank you, and you?"),
-      RESULT3.toList)
+        "3,1,I'm fine, thank you, and you?"))
   }
 
   @Test
@@ -193,23 +184,27 @@ class PartitionableSinkITCase extends BatchTestBase {
         "insert into sinkTable partition(b=1)\n"
           + "(values (1, 'Hello world, how are you?'), (4, '你好,陌生人,我是'), (2, 
'Hello'))")
       .await()
-    assertEquals(List("1,1,Hello world, how are you?"), RESULT1.toList)
-    assertEquals(List("4,1,你好,陌生人,我是"), RESULT2.toList)
-    assertEquals(List("2,1,Hello"), RESULT3.toList)
+    assertThatIterable(RESULT1).containsExactlyElementsOf(List("1,1,Hello 
world, how are you?"))
+    
assertThatIterable(RESULT2).containsExactlyElementsOf(List("4,1,你好,陌生人,我是"))
+    assertThatIterable(RESULT3).containsExactlyElementsOf(List("2,1,Hello"))
   }
 
   @Test
   def testStaticPartitionNotInPartitionFields(): Unit = {
-    expectedEx.expect(classOf[ValidationException])
     registerTableSink(tableName = "sinkTable2", rowType = type4, 
partitionColumns = Array("a", "b"))
-    tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b from 
sortTable").await()
+    assertThatThrownBy(
+      () =>
+        tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b 
from sortTable").await())
+      .isInstanceOf(classOf[ValidationException])
   }
 
   @Test
   def testInsertStaticPartitionOnNonPartitionedSink(): Unit = {
-    expectedEx.expect(classOf[TableException])
     registerTableSink(tableName = "sinkTable2", rowType = type4, 
partitionColumns = Array())
-    tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b from 
sortTable").await()
+    assertThatThrownBy(
+      () =>
+        tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b 
from sortTable").await())
+      .isInstanceOf(classOf[TableException])
   }
 
   private def registerTableSink(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 8338d3d3caf..9f657eddb72 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -19,28 +19,31 @@ package org.apache.flink.table.planner.runtime.batch.sql
 
 import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, ObjectPath}
 import org.apache.flink.table.planner.factories.{TestValuesCatalog, 
TestValuesTableFactory}
-import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase
 import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.createTempFolder
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.utils.TestingTableEnvironment
 import org.apache.flink.table.resource.{ResourceType, ResourceUri}
+import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, 
ParameterizedTestExtension, Parameters}
 import org.apache.flink.util.UserClassLoaderJarTestUtils
 
-import org.junit.{Before, Test}
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
+import org.junit.jupiter.api.{BeforeEach, TestTemplate}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import java.util
 import java.util.Collections
 
 import scala.collection.JavaConversions._
 
-@RunWith(classOf[Parameterized])
-class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val 
useCatalogFilter: Boolean)
-  extends BatchTestBase {
+@ExtendWith(Array(classOf[ParameterizedTestExtension]))
+class PartitionableSourceITCase extends BatchTestBase {
 
-  @Before
+  @Parameter val sourceFetchPartitions: Boolean = false
+
+  @Parameter(value = 1)
+  val useCatalogFilter: Boolean = false
+
+  @BeforeEach
   override def before(): Unit = {
     super.before()
 
@@ -129,7 +132,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
     }
   }
 
-  @Test
+  @TestTemplate
   def testSimplePartitionFieldPredicate1(): Unit = {
     checkResult(
       "SELECT * FROM PartitionableTable WHERE part1 = 'A'",
@@ -140,7 +143,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
       ))
   }
 
-  @Test
+  @TestTemplate
   def testPartialPartitionFieldPredicatePushDown(): Unit = {
     checkResult(
       "SELECT * FROM PartitionableTable WHERE (id > 2 OR part1 = 'A') AND 
part2 > 1",
@@ -150,7 +153,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
       ))
   }
 
-  @Test
+  @TestTemplate
   def testUnconvertedExpression(): Unit = {
     checkResult(
       "select * from PartitionableTable where trim(part1) = 'A' and part2 > 1",
@@ -159,7 +162,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
       ))
   }
 
-  @Test
+  @TestTemplate
   def testPushDownPartitionAndFiltersContainPartitionKeys(): Unit = {
     checkResult(
       "SELECT * FROM PartitionableAndFilterableTable WHERE part1 = 'A' AND id 
> 1",
@@ -169,7 +172,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
       ))
   }
 
-  @Test
+  @TestTemplate
   def 
testPushDownPartitionAndFiltersContainPartitionKeysWithSingleProjection(): Unit 
= {
     checkResult(
       "SELECT name FROM PartitionableAndFilterableTable WHERE part1 = 'A' AND 
id > 1",
@@ -179,7 +182,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
       ))
   }
 
-  @Test
+  @TestTemplate
   def testPartitionPrunerCompileClassLoader(): Unit = {
     val udfJavaCode =
       s"""
@@ -213,7 +216,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
 }
 
 object PartitionableSourceITCase {
-  @Parameterized.Parameters(name = "sourceFetchPartitions={0}, 
useCatalogFilter={1}")
+  @Parameters(name = "sourceFetchPartitions={0}, useCatalogFilter={1}")
   def parameters(): util.Collection[Array[Any]] = {
     Seq[Array[Any]](
       Array(true, false),
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
index bf7324b71a2..10662856840 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
@@ -21,13 +21,11 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 
-import org.junit._
-
-import scala.collection.Seq
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class RankITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala
index 2da8e735d11..a0d951fc49b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala
@@ -21,23 +21,25 @@ import 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_T
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCaseHelper
-import 
org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.{JoinType, _}
+import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType._
 import org.apache.flink.table.planner.runtime.utils.{BatchTableEnvUtil, 
BatchTestBase}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
+import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, 
ParameterizedTestExtension, Parameters}
 
-import org.junit.{Before, Test}
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
+import org.junit.jupiter.api.{BeforeEach, TestTemplate}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import java.util
 
 import scala.util.Random
 
-@RunWith(classOf[Parameterized])
-class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase {
+@ExtendWith(Array(classOf[ParameterizedTestExtension]))
+class SetOperatorsITCase extends BatchTestBase {
 
-  @Before
+  @Parameter var joinType: JoinType = _
+
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("AllNullTable3", allNullData3, type3, "a, b, c")
@@ -47,7 +49,7 @@ class SetOperatorsITCase(joinType: JoinType) extends 
BatchTestBase {
     JoinITCaseHelper.disableOtherJoinOpForJoin(tEnv, joinType)
   }
 
-  @Test
+  @TestTemplate
   def testIntersect(): Unit = {
     val data = List(
       row(1, 1L, "Hi"),
@@ -66,14 +68,14 @@ class SetOperatorsITCase(joinType: JoinType) extends 
BatchTestBase {
     checkResult("SELECT c FROM SmallTable3 INTERSECT SELECT c FROM T", 
Seq(row("Hi"), row("Hello")))
   }
 
-  @Test
+  @TestTemplate
   def testIntersectWithFilter(): Unit = {
     checkResult(
       "SELECT c FROM ((SELECT * FROM SmallTable3) INTERSECT (SELECT * FROM 
Table3)) WHERE a > 1",
       Seq(row("Hello"), row("Hello world")))
   }
 
-  @Test
+  @TestTemplate
   def testExcept(): Unit = {
     val data = List(row(1, 1L, "Hi"))
     BatchTableEnvUtil.registerCollection(
@@ -88,7 +90,7 @@ class SetOperatorsITCase(joinType: JoinType) extends 
BatchTestBase {
       Seq(row("Hello"), row("Hello world")))
   }
 
-  @Test
+  @TestTemplate
   def testExceptWithFilter(): Unit = {
     checkResult(
       "SELECT c FROM (" +
@@ -97,24 +99,24 @@ class SetOperatorsITCase(joinType: JoinType) extends 
BatchTestBase {
       Seq(row("Hi")))
   }
 
-  @Test
+  @TestTemplate
   def testIntersectWithNulls(): Unit = {
     checkResult("SELECT c FROM AllNullTable3 INTERSECT SELECT c FROM 
AllNullTable3", Seq(row(null)))
   }
 
-  @Test
+  @TestTemplate
   def testExceptWithNulls(): Unit = {
     checkResult("SELECT c FROM AllNullTable3 EXCEPT SELECT c FROM 
AllNullTable3", Seq())
   }
 
-  @Test
+  @TestTemplate
   def testIntersectAll(): Unit = {
     BatchTableEnvUtil.registerCollection(tEnv, "T1", Seq(1, 1, 1, 2, 2), "c")
     BatchTableEnvUtil.registerCollection(tEnv, "T2", Seq(1, 2, 2, 2, 3), "c")
     checkResult("SELECT c FROM T1 INTERSECT ALL SELECT c FROM T2", Seq(row(1), 
row(2), row(2)))
   }
 
-  @Test
+  @TestTemplate
   def testMinusAll(): Unit = {
     BatchTableEnvUtil.registerCollection(tEnv, "T2", Seq((1, 1L, "Hi")), "a, 
b, c")
     val t1 = "SELECT * FROM SmallTable3"
@@ -134,13 +136,13 @@ class SetOperatorsITCase(joinType: JoinType) extends 
BatchTestBase {
 }
 
 object SetOperatorsITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")
   def parameters(): util.Collection[Array[_]] = {
     util.Arrays.asList(
       // TODO
-//      Array(BroadcastHashJoin),
-//      Array(HashJoin),
-//      Array(NestedLoopJoin),
+      //      Array(BroadcastHashJoin),
+      //      Array(HashJoin),
+      //      Array(NestedLoopJoin),
       Array(SortMergeJoin))
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
index f5ffd20f68c..06f703e9f1e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala
@@ -22,11 +22,11 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.types.Row
 
-import org.junit._
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class SortLimitITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     env.setParallelism(1) // set sink parallelism to 1
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala
index 2d6f0fa70f0..d170b1b3395 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.utils.{TestTableSourceWithTime, 
WithoutTimeAttributesTableSource}
 import org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 import java.lang.{Integer => JInt}
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index b2e048c80a6..d0fdc2ad792 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.planner.runtime.batch.sql
 
 import org.apache.flink.configuration.MemorySize
-import org.apache.flink.core.testutils.FlinkMatchers
+import org.apache.flink.core.testutils.FlinkAssertions
 import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.createTempFolder
@@ -27,9 +27,8 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData.smallData3
 import org.apache.flink.table.planner.utils.TableTestUtil
 
-import org.assertj.core.api.Assertions
-import org.hamcrest.MatcherAssert
-import org.junit.{Assert, Test}
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.junit.jupiter.api.Test
 
 class TableSinkITCase extends BatchTestBase {
 
@@ -69,28 +68,20 @@ class TableSinkITCase extends BatchTestBase {
       s"insert into MySink /*+ OPTIONS('path' = '$newPath2') */ select * from 
MyTable")
     stmtSet.execute().await()
 
-    Assert.assertTrue(TableTestUtil.readFromFile(resultPath).isEmpty)
+    assertThat(TableTestUtil.readFromFile(resultPath).isEmpty).isTrue
     val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     val result1 = TableTestUtil.readFromFile(newPath1)
-    Assert.assertEquals(expected.sorted, result1.sorted)
+    assertThat(expected.sorted).isEqualTo(result1.sorted)
     val result2 = TableTestUtil.readFromFile(newPath2)
-    Assert.assertEquals(expected.sorted, result2.sorted)
+    assertThat(expected.sorted).isEqualTo(result2.sorted)
   }
 
   @Test
   def testCollectSinkConfiguration(): Unit = {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, 
MemorySize.parse("1b"))
-    try {
-      checkResult("SELECT 1", Seq(row(1)))
-      Assert.fail("Expecting exception thrown from collect sink")
-    } catch {
-      case e: Exception =>
-        MatcherAssert.assertThat(
-          e,
-          FlinkMatchers.containsMessage(
-            "Please consider increasing max bytes per batch value " +
-              "by setting collect-sink.batch-size.max"))
-    }
+    assertThatThrownBy(() => checkResult("SELECT 1", Seq(row(1))))
+      .satisfies(FlinkAssertions.anyCauseMatches(
+        "Please consider increasing max bytes per batch value by setting 
collect-sink.batch-size.max"))
 
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, 
MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
@@ -125,7 +116,7 @@ class TableSinkITCase extends BatchTestBase {
       .await()
     val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     val result = TableTestUtil.readFromFile(resultPath)
-    Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)
+    assertThat(result.sorted).isEqualTo(expected.sorted)
 
     // test statement set
     val statementSet = tEnv.createStatementSet()
@@ -142,7 +133,7 @@ class TableSinkITCase extends BatchTestBase {
                                  |""".stripMargin)
     statementSet.execute().await()
     val useStatementResult = TableTestUtil.readFromFile(useStatementResultPath)
-    Assertions.assertThat(useStatementResult.sorted).isEqualTo(expected.sorted)
+    assertThat(useStatementResult.sorted).isEqualTo(expected.sorted)
   }
 
   @Test
@@ -161,16 +152,15 @@ class TableSinkITCase extends BatchTestBase {
                        |)
        """.stripMargin)
 
-    Assertions
-      .assertThatThrownBy(
-        () =>
-          tEnv
-            .executeSql("""
-                          |CREATE TABLE MyCtasTable
-                          | AS
-                          | SELECT * FROM MyTable
-                          |""".stripMargin)
-            .await())
+    assertThatThrownBy(
+      () =>
+        tEnv
+          .executeSql("""
+                        |CREATE TABLE MyCtasTable
+                        | AS
+                        | SELECT * FROM MyTable
+                        |""".stripMargin)
+          .await())
       .hasRootCauseMessage("\nExpecting actual not to be null")
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 70eaea46015..e3c987fd6a2 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql
 
-import org.apache.flink.table.catalog.ObjectPath
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
@@ -26,11 +25,12 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.utils._
 import org.apache.flink.util.FileUtils
 
-import org.junit.{Assert, Before, Test}
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class TableSourceITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     env.setParallelism(1) // set sink parallelism to 1
@@ -380,9 +380,9 @@ class TableSourceITCase extends BatchTestBase {
                            |""".stripMargin)
     stmtSet.execute().await()
 
-    val result = TableTestUtil.readFromFile(resultPath)
-    val expected = Seq("2,2,Hello", "3,2,Hello world", "3,2,Hello world")
-    Assert.assertEquals(expected.sorted, result.sorted)
+    val result = TableTestUtil.readFromFile(resultPath).sorted
+    val expected = List("2,2,Hello", "3,2,Hello world", "3,2,Hello 
world").sorted
+    assertThat(result).isEqualTo(expected)
   }
 
   @Test
@@ -425,6 +425,6 @@ class TableSourceITCase extends BatchTestBase {
       "3,2,Hello world",
       "3,2,Hello world",
       "3,2,Hello world")
-    Assert.assertEquals(expected.sorted, result.sorted)
+    assertThat(expected.sorted).isEqualTo(result.sorted)
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
index 00b3c9a6477..fcc89579cef 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala
@@ -25,9 +25,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 
-import org.junit._
-
-import scala.collection.Seq
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class UnionITCase extends BatchTestBase {
 
@@ -40,7 +38,7 @@ class UnionITCase extends BatchTestBase {
     binaryRow(type6.toRowFieldTypes, 4, 3L, fromString("Hello world, how are 
you?"))
   )
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection("Table3", smallData3, type3, "a, b, c", 
nullablesOfSmallData3)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
index bbad3d8b59e..1fefbf23230 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
@@ -24,10 +24,9 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime
 import org.apache.flink.types.Row
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 import scala.collection.JavaConverters._
-import scala.collection.Seq
 
 class UnnestITCase extends BatchTestBase {
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala
index 4384a2d6c5b..cccc3e9dc5c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.sql
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class ValuesITCase extends BatchTestBase {
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala
index 8bc0b75fe33..d4bcb1d735a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala
@@ -25,11 +25,11 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
 
-import org.junit.{Before, Test}
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class WindowTableFunctionITCase extends BatchTestBase {
 
-  @Before
+  @BeforeEach
   override def before(): Unit = {
     super.before()
     registerCollection(

Reply via email to