This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 70ac66d567e483b084d528a60f5153aa38a19dbf Author: Yuxin Tan <[email protected]> AuthorDate: Fri May 12 16:03:07 2023 +0800 [FLINK-32058][tests] Migrate the subclasses of BatchAbstractTestBase in runtime.batch.sql to JUnit5 --- .../planner/runtime/batch/sql/CalcITCase.scala | 76 +++++----- .../runtime/batch/sql/CodeSplitITCase.scala | 23 +-- .../runtime/batch/sql/CorrelateITCase.scala | 6 +- .../runtime/batch/sql/CorrelateITCase2.scala | 6 +- .../runtime/batch/sql/LegacyLimitITCase.scala | 5 +- .../batch/sql/LegacyTableSourceITCase.scala | 10 +- .../runtime/batch/sql/Limit0RemoveITCase.scala | 6 +- .../planner/runtime/batch/sql/LimitITCase.scala | 4 + .../planner/runtime/batch/sql/MiscITCase.scala | 157 +++++++++++---------- .../runtime/batch/sql/MultipleInputITCase.scala | 35 ++--- .../runtime/batch/sql/OverAggregateITCase.scala | 7 +- .../batch/sql/PartitionableSinkITCase.scala | 103 +++++++------- .../batch/sql/PartitionableSourceITCase.scala | 33 +++-- .../planner/runtime/batch/sql/RankITCase.scala | 6 +- .../runtime/batch/sql/SetOperatorsITCase.scala | 40 +++--- .../runtime/batch/sql/SortLimitITCase.scala | 4 +- .../runtime/batch/sql/TableScanITCase.scala | 2 +- .../runtime/batch/sql/TableSinkITCase.scala | 50 +++---- .../runtime/batch/sql/TableSourceITCase.scala | 14 +- .../planner/runtime/batch/sql/UnionITCase.scala | 6 +- .../planner/runtime/batch/sql/UnnestITCase.scala | 3 +- .../planner/runtime/batch/sql/ValuesITCase.scala | 2 +- .../batch/sql/WindowTableFunctionITCase.scala | 4 +- 23 files changed, 301 insertions(+), 301 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index 983cefc80c8..a0849349ace 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -44,9 +44,8 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil._ import org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime import org.apache.flink.types.Row -import org.junit._ -import org.junit.Assert.assertEquals -import org.junit.rules.ExpectedException +import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} +import org.junit.jupiter.api.{BeforeEach, Disabled, Test} import java.nio.charset.StandardCharsets import java.sql.{Date, Time, Timestamp} @@ -55,12 +54,7 @@ import java.util class CalcITCase extends BatchTestBase { - var _expectedEx: ExpectedException = ExpectedException.none - - @Rule - def expectedEx: ExpectedException = _expectedEx - - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3) @@ -398,9 +392,10 @@ class CalcITCase extends BatchTestBase { checkResult("SELECT `1-_./Ü`, b, c FROM (SELECT a as `1-_./Ü`, b, c FROM Table3)", data3) } - @Test(expected = classOf[ValidationException]) + @Test def testInvalidFields(): Unit = { - checkResult("SELECT a, foo FROM Table3", data3) + assertThatThrownBy(() => checkResult("SELECT a, foo FROM Table3", data3)) + .isInstanceOf(classOf[ValidationException]) } @Test @@ -1032,12 +1027,11 @@ class CalcITCase extends BatchTestBase { @Test def testMapTypeGroupBy(): Unit = { - _expectedEx.expectMessage( - "Type(MAP<INT NOT NULL, VARCHAR(5) NOT NULL> NOT NULL) is not an orderable data type, it is not supported as a ORDER_BY/GROUP_BY/JOIN_EQUAL field.") - checkResult( - "SELECT COUNT(*) FROM SmallTable3 GROUP BY MAP[1, 'Hello', 2, 'Hi']", - Seq() - ) + assertThatThrownBy( + () => + checkResult("SELECT COUNT(*) FROM SmallTable3 GROUP BY MAP[1, 'Hello', 2, 'Hi']", Seq())) + .hasMessage( + "Type(MAP<INT NOT NULL, VARCHAR(5) NOT NULL> NOT NULL) is not an orderable data type, it is not supported as a ORDER_BY/GROUP_BY/JOIN_EQUAL field.") } @Test @@ -1058,16 +1052,17 @@ class CalcITCase extends BatchTestBase { val result = executeQuery(table) val nestedRow = result.head.getField(0).asInstanceOf[Row] - assertEquals(data.head.getField(0), nestedRow.getField(0)) - assertEquals(data.head.getField(1), nestedRow.getField(1)) - assertEquals(data.head.getField(2), nestedRow.getField(2)) + assertThat(data.head.getField(0)).isEqualTo(nestedRow.getField(0)) + assertThat(data.head.getField(1)).isEqualTo(nestedRow.getField(1)) + assertThat(data.head.getField(2)).isEqualTo(nestedRow.getField(2)) val arr = result.head.getField(1).asInstanceOf[Array[Integer]] - assertEquals(12, arr(0)) - assertEquals(data.head.getField(1), arr(1)) + assertThat(12).isEqualTo(arr(0)) + assertThat(data.head.getField(1)).isEqualTo(arr(1)) val hashMap = result.head.getField(2).asInstanceOf[util.HashMap[String, Timestamp]] - assertEquals(data.head.getField(2), hashMap.get(data.head.getField(0).asInstanceOf[String])) + assertThat(data.head.getField(2)) + .isEqualTo(hashMap.get(data.head.getField(0).asInstanceOf[String])) } @Test @@ -1144,7 +1139,7 @@ class CalcITCase extends BatchTestBase { ) } - @Ignore // TODO support Unicode + @Disabled // TODO support Unicode @Test def testFunctionWithUnicodeParameters(): Unit = { val data = List( @@ -1326,7 +1321,7 @@ class CalcITCase extends BatchTestBase { val d1 = LocalDateConverter.INSTANCE.toInternal(result.toList.head.getField(0).asInstanceOf[LocalDate]) - Assert.assertTrue(d0 <= d1 && d1 - d0 <= 1) + assertThat(d0 <= d1 && d1 - d0 <= 1).isTrue } @Test @@ -1346,7 +1341,7 @@ class CalcITCase extends BatchTestBase { val ts2 = System.currentTimeMillis() - Assert.assertTrue(ts0 <= ts1 && ts1 <= ts2) + assertThat(ts0 <= ts1 && ts1 <= ts2).isTrue } @Test @@ -1534,7 +1529,7 @@ class CalcITCase extends BatchTestBase { ) } - @Test(expected = classOf[UnsupportedOperationException]) + @Test def testOrderByBinary(): Unit = { registerCollection( "BinaryT", @@ -1550,18 +1545,21 @@ class CalcITCase extends BatchTestBase { ) tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(1)) tableConfig.set(BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED, Boolean.box(true)) - checkResult( - "select * from BinaryT order by c", - nullData3 - .sortBy((x: Row) => x.getField(2).asInstanceOf[String]) - .map( - r => - row( - r.getField(0), - r.getField(1), - r.getField(2).toString.getBytes(StandardCharsets.UTF_8))), - isSorted = true - ) + + assertThatThrownBy( + () => + checkResult( + "select * from BinaryT order by c", + nullData3 + .sortBy((x: Row) => x.getField(2).asInstanceOf[String]) + .map( + r => + row( + r.getField(0), + r.getField(1), + r.getField(2).toString.getBytes(StandardCharsets.UTF_8))), + isSorted = true + )).isInstanceOf(classOf[UnsupportedOperationException]) } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala index 077978651f5..74384b4cf0d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala @@ -24,13 +24,13 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData.{nullablesOfData3, smallData3, type3} import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} - -import scala.collection.Seq +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.IterableAssert.assertThatIterable +import org.junit.jupiter.api.{BeforeEach, Test} class CodeSplitITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("SmallTable3", smallData3, type3, "a, b, c", nullablesOfData3) @@ -117,7 +117,8 @@ class CodeSplitITCase extends BatchTestBase { for (i <- 0 until 100) { expected.add(s"+I[${Range(0, 100).map(_ => s"$i").mkString(", ")}]") } - Assert.assertEquals(expected, TestValuesTableFactory.getResults("test_many_values")) + assertThatIterable(TestValuesTableFactory.getResults("test_many_values")) + .containsExactlyElementsOf(expected) } /** @@ -164,17 +165,17 @@ class CodeSplitITCase extends BatchTestBase { val result = executeQuery(table) // The result table should contain 250 columns from a_1 to a_250 and 10 rows with values from 1 to 10. - Assert.assertEquals(10, result.size) + assertThat(result.size).isEqualTo(10) for (rowNumber <- result.indices) { val row = result(rowNumber) - Assert.assertEquals(250, row.getArity) + assertThat(row.getArity).isEqualTo(250) for (j <- 1 to 250) { // column value starts from 1 and ends at 10. val expectedRowValue = rowNumber + 1 - Assert.assertEquals( - "Invalid value for row %d and column a_%d.".format(rowNumber, j), - expectedRowValue, - row.getField("a_" + j)) + assertThat(row.getField("a_" + j)) + .withFailMessage("Invalid value for row %d and column a_%d.".format(rowNumber, j)) + .isEqualTo(expectedRowValue) + } } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala index fcad3d9d6b9..b3ddc833b75 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala @@ -31,18 +31,18 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0 import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.{MyPojo, MyPojoFunc} -import org.apache.flink.table.planner.utils.{HierarchyTableFunction, PojoTableFunc, RichTableFunc1, RichTableFuncWithFinish, TableFunc0, TableFunc1, TableFunc2, TableFunc3, VarArgsFunc0} +import org.apache.flink.table.planner.utils._ import org.apache.flink.table.planner.utils.DateTimeTestUtil._ import org.apache.flink.table.runtime.typeutils.StringDataTypeInfo import org.apache.flink.types.Row -import org.junit.{Before, Test} +import org.junit.jupiter.api.{BeforeEach, Test} import scala.collection.JavaConversions._ class CorrelateITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("inputT", TableFunctionITCase.testData, type3, "a, b, c") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala index 892150d463d..4b0676c8d8b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase2.scala @@ -24,13 +24,11 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase._ import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit import org.apache.flink.table.planner.runtime.utils.TestData._ -import org.junit.{Before, Test} - -import scala.collection.Seq +import org.junit.jupiter.api.{BeforeEach, Test} class CorrelateITCase2 extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("inputT", TableFunctionITCase.testData, type3, "a, b, c") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala index 11aafa004a2..054022fd590 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyLimitITCase.scala @@ -18,16 +18,15 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.table.api.TableSchema -import org.apache.flink.table.api.ValidationException import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.planner.utils.TestLegacyLimitableTableSource -import org.junit._ +import org.junit.jupiter.api.{BeforeEach, Test} class LegacyLimitITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala index 4ef34d8fdce..0ce2146be49 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala @@ -20,16 +20,16 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.{DataTypes, TableSchema, Types} -import org.apache.flink.table.api.config.TableConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.{createFileInTempFolder, createTempFolder} import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row -import org.apache.flink.table.planner.utils.{TableTestUtil, TestDataTypeTableSource, TestFileInputFormatTableSource, TestInputFormatTableSource, TestLegacyFilterableTableSource, TestLegacyProjectableTableSource, TestNestedProjectableTableSource, TestPartitionableSourceFactory, TestTableSourceSinks} +import org.apache.flink.table.planner.utils._ import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter import org.apache.flink.types.Row -import org.junit.{Assert, Before, Test} +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} import java.io.FileWriter import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} @@ -40,7 +40,7 @@ import scala.collection.mutable class LegacyTableSourceITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() env.setParallelism(1) // set sink parallelism to 1 @@ -373,6 +373,6 @@ class LegacyTableSourceITCase extends BatchTestBase { val result = TableTestUtil.readFromFile(resultPath) val expected = Seq("31,31,31.0", "32,32,32.0", "32,32,32.0") - Assert.assertEquals(expected.sorted, result.sorted) + assertThat(expected.sorted).isEqualTo(result.sorted) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala index 68c5d5a468f..0ed5587a13e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/Limit0RemoveITCase.scala @@ -21,15 +21,13 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData.numericType -import org.junit.{Before, Test} +import org.junit.jupiter.api.{BeforeEach, Test} import java.math.{BigDecimal => JBigDecimal} -import scala.collection.Seq - class Limit0RemoveITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() lazy val numericData = Seq( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala index edf0a0b0b4e..7dcd5de370a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala @@ -21,7 +21,11 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} import org.apache.flink.table.planner.runtime.utils.TestData.{data3, nullablesOfData3, type3} +import org.junit.jupiter.api.BeforeEach + class LimitITCase extends LegacyLimitITCase { + + @BeforeEach override def before(): Unit = { BatchTestBase.configForMiniCluster(tableConfig) registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala index 648a61c08a5..fd5a5a4343e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM import org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCaseHelper import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.SortMergeJoin @@ -28,9 +29,8 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData.{buildInData, buildInType} import org.apache.flink.types.Row -import org.junit.{Before, Test} - -import scala.collection.Seq +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.{BeforeEach, Test} /** Misc tests. */ class MiscITCase extends BatchTestBase { @@ -39,7 +39,7 @@ class MiscITCase extends BatchTestBase { private var newTableId = 0 - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("testTable", buildInData, buildInType, "a,b,c,d,e,f,g,h,i,j") @@ -528,82 +528,97 @@ class MiscITCase extends BatchTestBase { Seq(row(false, 1), row(true, 3))) } - @Test(expected = classOf[org.apache.flink.table.api.ValidationException]) + @Test def testTableGenerateFunction(): Unit = { - checkResult( - "SELECT f, g, v FROM testTable," + - "LATERAL TABLE(STRING_SPLIT(f, ' ')) AS T(v)", - Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", null, "fg")) - ) + assertThatThrownBy( + () => + checkResult( + "SELECT f, g, v FROM testTable," + + "LATERAL TABLE(STRING_SPLIT(f, ' ')) AS T(v)", + Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", null, "fg")) + )).isInstanceOf(classOf[ValidationException]) // BuildInFunctions in SQL is case insensitive - checkResult( - "SELECT f, g, v FROM testTable," + - "LATERAL TABLE(sTRING_sPLIT(f, ' ')) AS T(v)", - Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", null, "fg")) - ) - - checkResult( - "SELECT f, g, v FROM testTable," + - "LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v)", - Seq( - row("abcd", "f%g", 0), - row(null, "hij_k", 0), - row(null, "hij_k", 1), - row("e fg", null, 0), - row("e fg", null, 1), - row("e fg", null, 2)) - ) - - checkResult( - "SELECT f, g, v FROM testTable," + - "LATERAL TABLE(JSON_TUPLE('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}'," + - "'a1', f)) AS T(v)", - Seq( - row("abcd", "f%g", "b1"), - row("abcd", "f%g", null), - row(null, "hij_k", "b1"), - row(null, "hij_k", null), - row("e fg", null, "b1"), - row("e fg", null, "b3")) - ) - - checkResult( - "SELECT f, g, v FROM " + - "testTable JOIN LATERAL TABLE(JSON_TUPLE" + - "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS T(v) " + - "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = CHAR_LENGTH(v) + 3", - Seq( - row("abcd", "f%g", "b1"), - row(null, "hij_k", "b1"), - row("e fg", null, "b1"), - row("e fg", null, "b3")) - ) - - checkResult( - "SELECT f, g, v FROM " + - "testTable JOIN LATERAL TABLE(JSON_TUPLE" + - "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS T(v) " + - "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = CHAR_LENGTH(v) + 3", - Seq( - row("abcd", "f%g", "b1"), - row(null, "hij_k", "b1"), - row("e fg", null, "b1"), - row("e fg", null, "b3")) - ) + assertThatThrownBy( + () => + checkResult( + "SELECT f, g, v FROM testTable," + + "LATERAL TABLE(sTRING_sPLIT(f, ' ')) AS T(v)", + Seq(row("abcd", "f%g", "abcd"), row("e fg", null, "e"), row("e fg", null, "fg")) + )).isInstanceOf(classOf[ValidationException]) + + assertThatThrownBy( + () => + checkResult( + "SELECT f, g, v FROM testTable," + + "LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v)", + Seq( + row("abcd", "f%g", 0), + row(null, "hij_k", 0), + row(null, "hij_k", 1), + row("e fg", null, 0), + row("e fg", null, 1), + row("e fg", null, 2)) + )).isInstanceOf(classOf[ValidationException]) + + assertThatThrownBy( + () => + checkResult( + "SELECT f, g, v FROM testTable," + + "LATERAL TABLE(JSON_TUPLE('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}'," + + "'a1', f)) AS T(v)", + Seq( + row("abcd", "f%g", "b1"), + row("abcd", "f%g", null), + row(null, "hij_k", "b1"), + row(null, "hij_k", null), + row("e fg", null, "b1"), + row("e fg", null, "b3")) + )).isInstanceOf(classOf[ValidationException]) + + assertThatThrownBy( + () => + checkResult( + "SELECT f, g, v FROM " + + "testTable JOIN LATERAL TABLE(JSON_TUPLE" + + "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS T(v) " + + "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = CHAR_LENGTH(v) + 3", + Seq( + row("abcd", "f%g", "b1"), + row(null, "hij_k", "b1"), + row("e fg", null, "b1"), + row("e fg", null, "b3")) + )).isInstanceOf(classOf[ValidationException]) + + assertThatThrownBy( + () => + checkResult( + "SELECT f, g, v FROM " + + "testTable JOIN LATERAL TABLE(JSON_TUPLE" + + "('{\"a1\": \"b1\", \"a2\": \"b2\", \"e fg\": \"b3\"}', 'a1', f)) AS T(v) " + + "ON CHAR_LENGTH(f) = CHAR_LENGTH(v) + 2 OR CHAR_LENGTH(g) = CHAR_LENGTH(v) + 3", + Seq( + row("abcd", "f%g", "b1"), + row(null, "hij_k", "b1"), + row("e fg", null, "b1"), + row("e fg", null, "b3")) + )).isInstanceOf(classOf[ValidationException]) } /** * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the join * predicate can only be empty or literal true (the restriction should be removed in FLINK-7865). */ - @Test(expected = classOf[org.apache.flink.table.api.ValidationException]) + @Test def testTableGenerateFunctionLeftJoin(): Unit = { - checkResult( - "SELECT f, g, v FROM " + - "testTable LEFT OUTER JOIN LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v) " + - "ON LENGTH(f) = v + 2 OR LENGTH(g) = v + 4", - Seq(row(null, "hij_k", 1), row("e fg", null, 2)) - ) + assertThatThrownBy( + () => + checkResult( + "SELECT f, g, v FROM " + + "testTable LEFT OUTER JOIN LATERAL TABLE(GENERATE_SERIES(0, CAST(b AS INTEGER))) AS T(v) " + + "ON LENGTH(f) = v + 2 OR LENGTH(g) = v + 4", + Seq(row(null, "hij_k", 1), row("e fg", null, 2)) + )).isInstanceOf(classOf[ValidationException]) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala index f25d2d96ec9..e68b6b63010 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala @@ -25,12 +25,11 @@ import org.apache.flink.configuration.JobManagerOptions.SchedulerType import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.runtime.utils.BatchTestBase +import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row -import org.junit.{Before, Test} -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.runners.Parameterized.Parameters +import org.junit.jupiter.api.{BeforeEach, TestTemplate} +import org.junit.jupiter.api.extension.ExtendWith import scala.collection.JavaConversions._ import scala.util.Random @@ -42,11 +41,15 @@ import scala.util.Random * IT cases are picked from * [[org.apache.flink.table.planner.plan.batch.sql.MultipleInputCreationTest]]. */ -@RunWith(classOf[Parameterized]) -class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: SchedulerType) - extends BatchTestBase { +@ExtendWith(Array(classOf[ParameterizedTestExtension])) +class MultipleInputITCase extends BatchTestBase { - @Before + @Parameter var shuffleMode: BatchShuffleMode = _ + + @Parameter(value = 1) + var schedulerType: SchedulerType = _ + + @BeforeEach override def before(): Unit = { super.before() @@ -79,7 +82,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule tEnv.getConfig.set(JobManagerOptions.SCHEDULER, schedulerType) } - @Test + @TestTemplate def testBasicMultipleInput(): Unit = { checkMultipleInputResult(""" |SELECT * FROM @@ -90,7 +93,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule |""".stripMargin) } - @Test + @TestTemplate def testManyMultipleInputs(): Unit = { checkMultipleInputResult( """ @@ -116,7 +119,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule |""".stripMargin) } - @Test + @TestTemplate def testJoinWithAggAsProbe(): Unit = { checkMultipleInputResult( """ @@ -130,7 +133,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule ) } - @Test + @TestTemplate def testNoPriorityConstraint(): Unit = { checkMultipleInputResult( """ @@ -141,7 +144,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule ) } - @Test + @TestTemplate def testRelatedInputs(): Unit = { checkMultipleInputResult( """ @@ -157,7 +160,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule ) } - @Test + @TestTemplate def testRelatedInputsWithAgg(): Unit = { checkMultipleInputResult( """ @@ -173,7 +176,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule ) } - @Test + @TestTemplate def testDeadlockCausedByExchangeInAncestor(): Unit = { checkMultipleInputResult( """ @@ -185,7 +188,7 @@ class MultipleInputITCase(shuffleMode: BatchShuffleMode, schedulerType: Schedule ) } - @Test + @TestTemplate def testMaxSupportedInputs(): Unit = { val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO) val data = Seq(BatchTestBase.row(1, "test")) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala index 979fa83e8e3..250ba36a6f8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.planner.runtime.batch.sql -import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO, INT_TYPE_INFO, SHORT_TYPE_INFO, STRING_TYPE_INFO} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1} import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} @@ -32,17 +32,16 @@ import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils import org.apache.flink.table.planner.utils.DateTimeTestUtil._ import org.apache.flink.types.Row -import org.junit.{Before, Test} +import org.junit.jupiter.api.{BeforeEach, Test} import java.lang.{Iterable => JIterable, Long => JLong} import java.util.{Collections, Optional} -import scala.collection.Seq import scala.util.Random class OverAggregateITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("Table5", data5, type5, "d, e, f, g, h", nullablesOfData5) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala index a16b7c6d540..8e4d4ce5b72 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.configuration.{BatchExecutionOptions, Configuration} import org.apache.flink.connector.file.table.FileSystemConnectorOptions +import org.apache.flink.core.testutils.EachCallbackWrapper import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.table.api.{Schema, TableEnvironment, TableException, TableSchema, ValidationException} @@ -31,39 +32,33 @@ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR import org.apache.flink.table.descriptors.DescriptorProperties import org.apache.flink.table.descriptors.Schema.SCHEMA import org.apache.flink.table.factories.TableSinkFactory -import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4, type_int_string, _} +import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4, _} import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink} import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType} import org.apache.flink.table.types.utils.TypeConversions -import org.apache.flink.table.utils.LegacyRowResource +import org.apache.flink.table.utils.LegacyRowExtension import org.apache.flink.types.Row -import org.junit.{Before, Rule, Test} -import org.junit.Assert._ -import org.junit.rules.ExpectedException +import org.assertj.core.api.Assertions.{assertThat, assertThatIterable, assertThatThrownBy} +import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.extension.RegisterExtension import java.util import java.util.{function, ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import scala.collection.Seq /** Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]]. */ class PartitionableSinkITCase extends BatchTestBase { - private val _expectedException = ExpectedException.none + @RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] = + new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension) - @Rule - def expectedEx: ExpectedException = _expectedException - - @Rule - def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE - - @Before + @BeforeEach override def before(): Unit = { super.before() env.setParallelism(3) @@ -83,9 +78,10 @@ class PartitionableSinkITCase extends BatchTestBase { "insert into sinkTable select a, max(b), c" + " from nonSortTable group by a, c") .await() - assertEquals(List("1,5,Hi", "1,5,Hi01", "1,5,Hi02"), RESULT1.sorted) - assert(RESULT2.isEmpty) - assertEquals( + assertThatIterable(RESULT1).containsExactlyInAnyOrderElementsOf( + List("1,5,Hi", "1,5,Hi01", "1,5,Hi02")) + assertThat(RESULT2.isEmpty()).isTrue + assertThatIterable(RESULT3).containsExactlyInAnyOrderElementsOf( List( "2,1,Hello world01", "2,1,Hello world02", @@ -98,34 +94,33 @@ class PartitionableSinkITCase extends BatchTestBase { "3,2,Hello02", "3,2,Hello03", "3,2,Hello04" - ), - RESULT3.sorted - ) + )) } @Test def testInsertWithPartitionGrouping(): Unit = { registerTableSink() tEnv.executeSql("insert into sinkTable select a, b, c from sortTable").await() - assertEquals(List("1,1,Hello world", "1,1,Hello world, how are you?"), RESULT1.toList) - assertEquals( - List("4,4,你好,陌生人", "4,4,你好,陌生人,我是", "4,4,你好,陌生人,我是中国人", "4,4,你好,陌生人,我是中国人,你来自哪里?"), - RESULT2.toList) - assertEquals( + assertThatIterable(RESULT1).containsExactlyElementsOf( + List("1,1,Hello world", "1,1,Hello world, how are you?")) + assertThatIterable(RESULT2).containsExactlyElementsOf( + List("4,4,你好,陌生人", "4,4,你好,陌生人,我是", "4,4,你好,陌生人,我是中国人", "4,4,你好,陌生人,我是中国人,你来自哪里?")) + assertThatIterable(RESULT3).containsExactlyElementsOf( List( "2,2,Hi", "2,2,Hello", "3,3,I'm fine, thank", "3,3,I'm fine, thank you", - "3,3,I'm fine, thank you, and you?"), - RESULT3.toList) + "3,3,I'm fine, thank you, and you?")) } @Test def testInsertWithStaticPartitions(): Unit = { registerTableSink() tEnv.executeSql("insert into sinkTable partition(a=1) select b, c from sortTable").await() - assertEquals( + assertThatIterable( + RESULT1 + ).containsExactlyElementsOf( List( "1,2,Hi", "1,1,Hello world", @@ -138,51 +133,47 @@ class PartitionableSinkITCase extends BatchTestBase { "1,4,你好,陌生人,我是", "1,4,你好,陌生人,我是中国人", "1,4,你好,陌生人,我是中国人,你来自哪里?" - ), - RESULT1.toList - ) - assert(RESULT2.isEmpty) - assert(RESULT3.isEmpty) + )) + assertThat(RESULT2.isEmpty).isTrue + assertThat(RESULT3.isEmpty).isTrue } @Test def testInsertWithStaticAndDynamicPartitions(): Unit = { registerTableSink(partitionColumns = Array("a", "b")) tEnv.executeSql("insert into sinkTable partition(a=1) select b, c from sortTable").await() - assertEquals(List("1,1,Hello world", "1,1,Hello world, how are you?"), RESULT1.toList) - assertEquals( - List("1,4,你好,陌生人", "1,4,你好,陌生人,我是", "1,4,你好,陌生人,我是中国人", "1,4,你好,陌生人,我是中国人,你来自哪里?"), - RESULT2.toList) - assertEquals( + assertThatIterable(RESULT1).containsExactlyElementsOf( + List("1,1,Hello world", "1,1,Hello world, how are you?")) + assertThatIterable(RESULT2).containsExactlyElementsOf( + List("1,4,你好,陌生人", "1,4,你好,陌生人,我是", "1,4,你好,陌生人,我是中国人", "1,4,你好,陌生人,我是中国人,你来自哪里?")) + assertThatIterable(RESULT3).containsExactlyElementsOf( List( "1,2,Hi", "1,2,Hello", "1,3,I'm fine, thank", "1,3,I'm fine, thank you", - "1,3,I'm fine, thank you, and you?"), - RESULT3.toList) + "1,3,I'm fine, thank you, and you?")) } @Test def testInsertWithStaticPartitionAndStarSource(): Unit = { registerTableSink(partitionColumns = Array("b", "c")) tEnv.executeSql("insert into sinkTable partition(b=1) select * from starTable").await() - assertEquals( + assertThatIterable(RESULT1).containsExactlyElementsOf( List( "1,1,Hello world, how are you?", "3,1,I'm fine, thank you", "4,1,你好,陌生人", - "4,1,你好,陌生人,我是中国人"), - RESULT1.toList) - assertEquals(List("4,1,你好,陌生人,我是", "4,1,你好,陌生人,我是中国人,你来自哪里?"), RESULT2.toList) - assertEquals( + "4,1,你好,陌生人,我是中国人")) + assertThatIterable(RESULT2).containsExactlyElementsOf( + List("4,1,你好,陌生人,我是", "4,1,你好,陌生人,我是中国人,你来自哪里?")) + assertThatIterable(RESULT3).containsExactlyElementsOf( List( "2,1,Hello", "1,1,Hello world", "2,1,Hi", "3,1,I'm fine, thank", - "3,1,I'm fine, thank you, and you?"), - RESULT3.toList) + "3,1,I'm fine, thank you, and you?")) } @Test @@ -193,23 +184,27 @@ class PartitionableSinkITCase extends BatchTestBase { "insert into sinkTable partition(b=1)\n" + "(values (1, 'Hello world, how are you?'), (4, '你好,陌生人,我是'), (2, 'Hello'))") .await() - assertEquals(List("1,1,Hello world, how are you?"), RESULT1.toList) - assertEquals(List("4,1,你好,陌生人,我是"), RESULT2.toList) - assertEquals(List("2,1,Hello"), RESULT3.toList) + assertThatIterable(RESULT1).containsExactlyElementsOf(List("1,1,Hello world, how are you?")) + assertThatIterable(RESULT2).containsExactlyElementsOf(List("4,1,你好,陌生人,我是")) + assertThatIterable(RESULT3).containsExactlyElementsOf(List("2,1,Hello")) } @Test def testStaticPartitionNotInPartitionFields(): Unit = { - expectedEx.expect(classOf[ValidationException]) registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array("a", "b")) - tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b from sortTable").await() + assertThatThrownBy( + () => + tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b from sortTable").await()) + .isInstanceOf(classOf[ValidationException]) } @Test def testInsertStaticPartitionOnNonPartitionedSink(): Unit = { - expectedEx.expect(classOf[TableException]) registerTableSink(tableName = "sinkTable2", rowType = type4, partitionColumns = Array()) - tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b from sortTable").await() + assertThatThrownBy( + () => + tEnv.executeSql("insert into sinkTable2 partition(c=1) select a, b from sortTable").await()) + .isInstanceOf(classOf[TableException]) } private def registerTableSink( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala index 8338d3d3caf..9f657eddb72 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala @@ -19,28 +19,31 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath} import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory} -import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.createTempFolder import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.utils.TestingTableEnvironment import org.apache.flink.table.resource.{ResourceType, ResourceUri} +import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, ParameterizedTestExtension, Parameters} import org.apache.flink.util.UserClassLoaderJarTestUtils -import org.junit.{Before, Test} -import org.junit.runner.RunWith -import org.junit.runners.Parameterized +import org.junit.jupiter.api.{BeforeEach, TestTemplate} +import org.junit.jupiter.api.extension.ExtendWith import java.util import java.util.Collections import scala.collection.JavaConversions._ -@RunWith(classOf[Parameterized]) -class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatalogFilter: Boolean) - extends BatchTestBase { +@ExtendWith(Array(classOf[ParameterizedTestExtension])) +class PartitionableSourceITCase extends BatchTestBase { - @Before + @Parameter val sourceFetchPartitions: Boolean = false + + @Parameter(value = 1) + val useCatalogFilter: Boolean = false + + @BeforeEach override def before(): Unit = { super.before() @@ -129,7 +132,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal } } - @Test + @TestTemplate def testSimplePartitionFieldPredicate1(): Unit = { checkResult( "SELECT * FROM PartitionableTable WHERE part1 = 'A'", @@ -140,7 +143,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal )) } - @Test + @TestTemplate def testPartialPartitionFieldPredicatePushDown(): Unit = { checkResult( "SELECT * FROM PartitionableTable WHERE (id > 2 OR part1 = 'A') AND part2 > 1", @@ -150,7 +153,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal )) } - @Test + @TestTemplate def testUnconvertedExpression(): Unit = { checkResult( "select * from PartitionableTable where trim(part1) = 'A' and part2 > 1", @@ -159,7 +162,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal )) } - @Test + @TestTemplate def testPushDownPartitionAndFiltersContainPartitionKeys(): Unit = { checkResult( "SELECT * FROM PartitionableAndFilterableTable WHERE part1 = 'A' AND id > 1", @@ -169,7 +172,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal )) } - @Test + @TestTemplate def testPushDownPartitionAndFiltersContainPartitionKeysWithSingleProjection(): Unit = { checkResult( "SELECT name FROM PartitionableAndFilterableTable WHERE part1 = 'A' AND id > 1", @@ -179,7 +182,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal )) } - @Test + @TestTemplate def testPartitionPrunerCompileClassLoader(): Unit = { val udfJavaCode = s""" @@ -213,7 +216,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal } object PartitionableSourceITCase { - @Parameterized.Parameters(name = "sourceFetchPartitions={0}, useCatalogFilter={1}") + @Parameters(name = "sourceFetchPartitions={0}, useCatalogFilter={1}") def parameters(): util.Collection[Array[Any]] = { Seq[Array[Any]]( Array(true, false), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala index bf7324b71a2..10662856840 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala @@ -21,13 +21,11 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ -import org.junit._ - -import scala.collection.Seq +import org.junit.jupiter.api.{BeforeEach, Test} class RankITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("Table3", data3, type3, "a, b, c", nullablesOfData3) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala index 2da8e735d11..a0d951fc49b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SetOperatorsITCase.scala @@ -21,23 +21,25 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_T import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCaseHelper -import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.{JoinType, _} +import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType._ import org.apache.flink.table.planner.runtime.utils.{BatchTableEnvUtil, BatchTestBase} import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ +import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, ParameterizedTestExtension, Parameters} -import org.junit.{Before, Test} -import org.junit.runner.RunWith -import org.junit.runners.Parameterized +import org.junit.jupiter.api.{BeforeEach, TestTemplate} +import org.junit.jupiter.api.extension.ExtendWith import java.util import scala.util.Random -@RunWith(classOf[Parameterized]) -class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase { +@ExtendWith(Array(classOf[ParameterizedTestExtension])) +class SetOperatorsITCase extends BatchTestBase { - @Before + @Parameter var joinType: JoinType = _ + + @BeforeEach override def before(): Unit = { super.before() registerCollection("AllNullTable3", allNullData3, type3, "a, b, c") @@ -47,7 +49,7 @@ class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase { JoinITCaseHelper.disableOtherJoinOpForJoin(tEnv, joinType) } - @Test + @TestTemplate def testIntersect(): Unit = { val data = List( row(1, 1L, "Hi"), @@ -66,14 +68,14 @@ class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase { checkResult("SELECT c FROM SmallTable3 INTERSECT SELECT c FROM T", Seq(row("Hi"), row("Hello"))) } - @Test + @TestTemplate def testIntersectWithFilter(): Unit = { checkResult( "SELECT c FROM ((SELECT * FROM SmallTable3) INTERSECT (SELECT * FROM Table3)) WHERE a > 1", Seq(row("Hello"), row("Hello world"))) } - @Test + @TestTemplate def testExcept(): Unit = { val data = List(row(1, 1L, "Hi")) BatchTableEnvUtil.registerCollection( @@ -88,7 +90,7 @@ class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase { Seq(row("Hello"), row("Hello world"))) } - @Test + @TestTemplate def testExceptWithFilter(): Unit = { checkResult( "SELECT c FROM (" + @@ -97,24 +99,24 @@ class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase { Seq(row("Hi"))) } - @Test + @TestTemplate def testIntersectWithNulls(): Unit = { checkResult("SELECT c FROM AllNullTable3 INTERSECT SELECT c FROM AllNullTable3", Seq(row(null))) } - @Test + @TestTemplate def testExceptWithNulls(): Unit = { checkResult("SELECT c FROM AllNullTable3 EXCEPT SELECT c FROM AllNullTable3", Seq()) } - @Test + @TestTemplate def testIntersectAll(): Unit = { BatchTableEnvUtil.registerCollection(tEnv, "T1", Seq(1, 1, 1, 2, 2), "c") BatchTableEnvUtil.registerCollection(tEnv, "T2", Seq(1, 2, 2, 2, 3), "c") checkResult("SELECT c FROM T1 INTERSECT ALL SELECT c FROM T2", Seq(row(1), row(2), row(2))) } - @Test + @TestTemplate def testMinusAll(): Unit = { BatchTableEnvUtil.registerCollection(tEnv, "T2", Seq((1, 1L, "Hi")), "a, b, c") val t1 = "SELECT * FROM SmallTable3" @@ -134,13 +136,13 @@ class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase { } object SetOperatorsITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") def parameters(): util.Collection[Array[_]] = { util.Arrays.asList( // TODO -// Array(BroadcastHashJoin), -// Array(HashJoin), -// Array(NestedLoopJoin), + // Array(BroadcastHashJoin), + // Array(HashJoin), + // Array(NestedLoopJoin), Array(SortMergeJoin)) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala index f5ffd20f68c..06f703e9f1e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/SortLimitITCase.scala @@ -22,11 +22,11 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.types.Row -import org.junit._ +import org.junit.jupiter.api.{BeforeEach, Test} class SortLimitITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() env.setParallelism(1) // set sink parallelism to 1 diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala index 2d6f0fa70f0..d170b1b3395 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableScanITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.utils.{TestTableSourceWithTime, WithoutTimeAttributesTableSource} import org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime -import org.junit.Test +import org.junit.jupiter.api.Test import java.lang.{Integer => JInt} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala index b2e048c80a6..d0fdc2ad792 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.configuration.MemorySize -import org.apache.flink.core.testutils.FlinkMatchers +import org.apache.flink.core.testutils.FlinkAssertions import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.createTempFolder @@ -27,9 +27,8 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData.smallData3 import org.apache.flink.table.planner.utils.TableTestUtil -import org.assertj.core.api.Assertions -import org.hamcrest.MatcherAssert -import org.junit.{Assert, Test} +import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} +import org.junit.jupiter.api.Test class TableSinkITCase extends BatchTestBase { @@ -69,28 +68,20 @@ class TableSinkITCase extends BatchTestBase { s"insert into MySink /*+ OPTIONS('path' = '$newPath2') */ select * from MyTable") stmtSet.execute().await() - Assert.assertTrue(TableTestUtil.readFromFile(resultPath).isEmpty) + assertThat(TableTestUtil.readFromFile(resultPath).isEmpty).isTrue val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world") val result1 = TableTestUtil.readFromFile(newPath1) - Assert.assertEquals(expected.sorted, result1.sorted) + assertThat(expected.sorted).isEqualTo(result1.sorted) val result2 = TableTestUtil.readFromFile(newPath2) - Assert.assertEquals(expected.sorted, result2.sorted) + assertThat(expected.sorted).isEqualTo(result2.sorted) } @Test def testCollectSinkConfiguration(): Unit = { tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1b")) - try { - checkResult("SELECT 1", Seq(row(1))) - Assert.fail("Expecting exception thrown from collect sink") - } catch { - case e: Exception => - MatcherAssert.assertThat( - e, - FlinkMatchers.containsMessage( - "Please consider increasing max bytes per batch value " + - "by setting collect-sink.batch-size.max")) - } + assertThatThrownBy(() => checkResult("SELECT 1", Seq(row(1)))) + .satisfies(FlinkAssertions.anyCauseMatches( + "Please consider increasing max bytes per batch value by setting collect-sink.batch-size.max")) tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb")) checkResult("SELECT 1", Seq(row(1))) @@ -125,7 +116,7 @@ class TableSinkITCase extends BatchTestBase { .await() val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world") val result = TableTestUtil.readFromFile(resultPath) - Assertions.assertThat(result.sorted).isEqualTo(expected.sorted) + assertThat(result.sorted).isEqualTo(expected.sorted) // test statement set val statementSet = tEnv.createStatementSet() @@ -142,7 +133,7 @@ class TableSinkITCase extends BatchTestBase { |""".stripMargin) statementSet.execute().await() val useStatementResult = TableTestUtil.readFromFile(useStatementResultPath) - Assertions.assertThat(useStatementResult.sorted).isEqualTo(expected.sorted) + assertThat(useStatementResult.sorted).isEqualTo(expected.sorted) } @Test @@ -161,16 +152,15 @@ class TableSinkITCase extends BatchTestBase { |) """.stripMargin) - Assertions - .assertThatThrownBy( - () => - tEnv - .executeSql(""" - |CREATE TABLE MyCtasTable - | AS - | SELECT * FROM MyTable - |""".stripMargin) - .await()) + assertThatThrownBy( + () => + tEnv + .executeSql(""" + |CREATE TABLE MyCtasTable + | AS + | SELECT * FROM MyTable + |""".stripMargin) + .await()) .hasRootCauseMessage("\nExpecting actual not to be null") } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala index 70eaea46015..e3c987fd6a2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala @@ -17,7 +17,6 @@ */ package org.apache.flink.table.planner.runtime.batch.sql -import org.apache.flink.table.catalog.ObjectPath import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} @@ -26,11 +25,12 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.utils._ import org.apache.flink.util.FileUtils -import org.junit.{Assert, Before, Test} +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} class TableSourceITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() env.setParallelism(1) // set sink parallelism to 1 @@ -380,9 +380,9 @@ class TableSourceITCase extends BatchTestBase { |""".stripMargin) stmtSet.execute().await() - val result = TableTestUtil.readFromFile(resultPath) - val expected = Seq("2,2,Hello", "3,2,Hello world", "3,2,Hello world") - Assert.assertEquals(expected.sorted, result.sorted) + val result = TableTestUtil.readFromFile(resultPath).sorted + val expected = List("2,2,Hello", "3,2,Hello world", "3,2,Hello world").sorted + assertThat(result).isEqualTo(expected) } @Test @@ -425,6 +425,6 @@ class TableSourceITCase extends BatchTestBase { "3,2,Hello world", "3,2,Hello world", "3,2,Hello world") - Assert.assertEquals(expected.sorted, result.sorted) + assertThat(expected.sorted).isEqualTo(result.sorted) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala index 00b3c9a6477..fcc89579cef 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala @@ -25,9 +25,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType} -import org.junit._ - -import scala.collection.Seq +import org.junit.jupiter.api.{BeforeEach, Test} class UnionITCase extends BatchTestBase { @@ -40,7 +38,7 @@ class UnionITCase extends BatchTestBase { binaryRow(type6.toRowFieldTypes, 4, 3L, fromString("Hello world, how are you?")) ) - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection("Table3", smallData3, type3, "a, b, c", nullablesOfSmallData3) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala index bbad3d8b59e..1fefbf23230 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala @@ -24,10 +24,9 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime import org.apache.flink.types.Row -import org.junit.Test +import org.junit.jupiter.api.Test import scala.collection.JavaConverters._ -import scala.collection.Seq class UnnestITCase extends BatchTestBase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala index 4384a2d6c5b..cccc3e9dc5c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/ValuesITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row -import org.junit.Test +import org.junit.jupiter.api.Test class ValuesITCase extends BatchTestBase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala index 8bc0b75fe33..d4bcb1d735a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala @@ -25,11 +25,11 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime -import org.junit.{Before, Test} +import org.junit.jupiter.api.{BeforeEach, Test} class WindowTableFunctionITCase extends BatchTestBase { - @Before + @BeforeEach override def before(): Unit = { super.before() registerCollection(
