This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bf6afc3d52eeb0ec31d483a7cb9e746570116fb3 Author: Jiabao Sun <[email protected]> AuthorDate: Tue Sep 12 14:46:35 2023 +0800 [FLINK-33023][table-planner][JUnit5 Migration] Module: flink-table-planner/api (TableTestBase) --- .../validation/MatchRecognizeValidationTest.java | 188 +++++++++++---------- .../apache/flink/table/api/batch/ExplainTest.scala | 32 ++-- .../flink/table/api/stream/ExplainTest.scala | 32 ++-- .../sql/validation/OverWindowValidationTest.scala | 13 +- 4 files changed, 135 insertions(+), 130 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java index cb40481db8d..c0ca9fbf975 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java @@ -26,40 +26,40 @@ import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.Wei import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction; import org.apache.flink.table.planner.utils.TableTestBase; import org.apache.flink.table.planner.utils.TableTestUtil; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.calcite.sql.SqlMatchRecognize; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collection; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + /** Validation test for {@link SqlMatchRecognize}. */ -@RunWith(Parameterized.class) -public class MatchRecognizeValidationTest extends TableTestBase { +@ExtendWith(ParameterizedTestExtension.class) +class MatchRecognizeValidationTest extends TableTestBase { private static final String STREAM = "stream"; private static final String BATCH = "batch"; - @Parameterized.Parameter public String mode; + @Parameter private String mode; - @Parameterized.Parameters(name = "mode = {0}") - public static Collection<String> parameters() { + @Parameters(name = "mode = {0}") + private static Collection<String> parameters() { return Arrays.asList(STREAM, BATCH); } - @Rule public ExpectedException expectedException = ExpectedException.none(); - private TableTestUtil util; private TableEnvironment tEnv; - @Before - public void setup() { + @BeforeEach + void setup() { util = STREAM.equals(mode) ? streamTestUtil(TableConfig.getDefault()) @@ -86,32 +86,31 @@ public class MatchRecognizeValidationTest extends TableTestBase { + ")"); } - @After - public void after() { + @AfterEach + void after() { util.getTableEnv().executeSql("DROP TABLE Ticker"); util.getTableEnv().executeSql("DROP TABLE MyTable"); } /** Function 'MATCH_ROWTIME()' can only be used in MATCH_RECOGNIZE. */ - @Test(expected = ValidationException.class) - public void testMatchRowTimeInSelect() { + @TestTemplate + void testMatchRowTimeInSelect() { String sql = "SELECT MATCH_ROWTIME() FROM MyTable"; - util.verifyExplain(sql); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> util.verifyExplain(sql)); } /** Function 'MATCH_PROCTIME()' can only be used in MATCH_RECOGNIZE. */ - @Test(expected = ValidationException.class) - public void testMatchProcTimeInSelect() { + @TestTemplate + void testMatchProcTimeInSelect() { String sql = "SELECT MATCH_PROCTIME() FROM MyTable"; - util.verifyExplain(sql); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> util.verifyExplain(sql)); } - @Test - public void testSortProcessingTimeDesc() { + @TestTemplate + void testSortProcessingTimeDesc() { if (STREAM.equals(mode)) { - expectedException.expect(TableException.class); - expectedException.expectMessage( - "Primary sort order of a streaming table must be ascending on time."); String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -123,16 +122,16 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining( + "Primary sort order of a streaming table must be ascending on time."); } } - @Test - public void testSortProcessingTimeSecondaryField() { + @TestTemplate + void testSortProcessingTimeSecondaryField() { if (STREAM.equals(mode)) { - expectedException.expect(TableException.class); - expectedException.expectMessage( - "You must specify either rowtime or proctime for order by as the first one."); String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -144,16 +143,16 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining( + "You must specify either rowtime or proctime for order by as the first one."); } } - @Test - public void testSortNoOrder() { + @TestTemplate + void testSortNoOrder() { if (STREAM.equals(mode)) { - expectedException.expect(TableException.class); - expectedException.expectMessage( - "You must specify either rowtime or proctime for order by."); String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -164,16 +163,16 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining( + "You must specify either rowtime or proctime for order by."); } } - @Test - public void testUpdatesInUpstreamOperatorNotSupported() { + @TestTemplate + void testUpdatesInUpstreamOperatorNotSupported() { if (STREAM.equals(mode)) { - expectedException.expect(TableException.class); - expectedException.expectMessage( - "Match Recognize doesn't support consuming update changes which is produced by node GroupAggregate("); String sqlQuery = "SELECT *\n" + "FROM (SELECT DISTINCT * FROM Ticker)\n" @@ -186,14 +185,15 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining( + "Match Recognize doesn't support consuming update changes which is produced by node GroupAggregate("); } } - @Test - public void testAggregatesOnMultiplePatternVariablesNotSupported() { - expectedException.expect(ValidationException.class); - expectedException.expectMessage("SQL validation failed."); + @TestTemplate + void testAggregatesOnMultiplePatternVariablesNotSupported() { String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -205,13 +205,13 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining("SQL validation failed."); } - @Test - public void testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs() { - expectedException.expect(ValidationException.class); - expectedException.expectMessage("Aggregation must be applied to a single pattern variable"); + @TestTemplate + void testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs() { util.addTemporarySystemFunction("weightedAvg", new WeightedAvg()); String sqlQuery = "SELECT *\n" @@ -224,13 +224,13 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining("Aggregation must be applied to a single pattern variable"); } - @Test - public void testValidatingAmbiguousColumns() { - expectedException.expect(ValidationException.class); - expectedException.expectMessage("Columns ambiguously defined: {symbol, price}"); + @TestTemplate + void testValidatingAmbiguousColumns() { String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -244,7 +244,9 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining("Columns ambiguously defined: {symbol, price}"); } // *************************************************************************************** @@ -253,11 +255,8 @@ public class MatchRecognizeValidationTest extends TableTestBase { // *************************************************************************************** /** Python Function can not be used in MATCH_RECOGNIZE for now. */ - @Test - public void testMatchPythonFunction() { - expectedException.expect(TableException.class); - expectedException.expectMessage( - "Python Function can not be used in MATCH_RECOGNIZE for now."); + @TestTemplate + void testMatchPythonFunction() { util.addTemporarySystemFunction("pyFunc", new PythonScalarFunction("pyFunc")); String sql = "SELECT T.aa as ta\n" @@ -272,13 +271,14 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " A AS a = 1,\n" + " B AS b = 'b'\n" + ") AS T"; - util.verifyExplain(sql); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sql)) + .withMessageContaining( + "Python Function can not be used in MATCH_RECOGNIZE for now."); } - @Test - public void testAllRowsPerMatch() { - expectedException.expect(TableException.class); - expectedException.expectMessage("All rows per match mode is not supported yet."); + @TestTemplate + void testAllRowsPerMatch() { String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -291,15 +291,13 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining("All rows per match mode is not supported yet."); } - @Test - public void testGreedyQuantifierAtTheEndIsNotSupported() { - expectedException.expect(TableException.class); - expectedException.expectMessage( - "Greedy quantifiers are not allowed as the last element of a " - + "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier."); + @TestTemplate + void testGreedyQuantifierAtTheEndIsNotSupported() { String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -311,15 +309,15 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining( + "Greedy quantifiers are not allowed as the last element of a " + + "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier."); } - @Test - public void testPatternsProducingEmptyMatchesAreNotSupported() { - expectedException.expect(TableException.class); - expectedException.expectMessage( - "Patterns that can produce empty matches are not supported. " - + "There must be at least one non-optional state."); + @TestTemplate + void testPatternsProducingEmptyMatchesAreNotSupported() { String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -331,13 +329,15 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(TableException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining( + "Patterns that can produce empty matches are not supported. " + + "There must be at least one non-optional state."); } - @Test - public void testDistinctAggregationsNotSupported() { - expectedException.expect(ValidationException.class); - expectedException.expectMessage("SQL validation failed."); + @TestTemplate + void testDistinctAggregationsNotSupported() { String sqlQuery = "SELECT *\n" + "FROM Ticker\n" @@ -349,6 +349,8 @@ public class MatchRecognizeValidationTest extends TableTestBase { + " DEFINE\n" + " A AS A.symbol = 'a'\n" + ") AS T"; - tEnv.executeSql(sqlQuery); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining("SQL validation failed."); } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala index e6ed0bce82f..715bee95477 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala @@ -23,12 +23,12 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.planner.utils.TableTestBase import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType} +import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} -import org.junit.{Before, Test} -import org.junit.runner.RunWith -import org.junit.runners.Parameterized +import org.junit.jupiter.api.{BeforeEach, TestTemplate} +import org.junit.jupiter.api.extension.ExtendWith -@RunWith(classOf[Parameterized]) +@ExtendWith(Array(classOf[ParameterizedTestExtension])) class ExplainTest(extended: Boolean) extends TableTestBase { private val extraDetails = if (extended) { @@ -46,33 +46,33 @@ class ExplainTest(extended: Boolean) extends TableTestBase { val LONG = new BigIntType() val INT = new IntType() - @Before + @BeforeEach def before(): Unit = { util.tableEnv.getConfig .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(4)) } - @Test + @TestTemplate def testExplainWithTableSourceScan(): Unit = { util.verifyExplain("SELECT * FROM MyTable", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithDataStreamScan(): Unit = { util.verifyExplain("SELECT * FROM MyTable1", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithFilter(): Unit = { util.verifyExplain("SELECT * FROM MyTable1 WHERE mod(a, 2) = 0", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithAgg(): Unit = { util.verifyExplain("SELECT COUNT(*) FROM MyTable1 GROUP BY a", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithJoin(): Unit = { // TODO support other join operators when them are supported util.tableEnv.getConfig @@ -80,24 +80,24 @@ class ExplainTest(extended: Boolean) extends TableTestBase { util.verifyExplain("SELECT a, b, c, e, f FROM MyTable1, MyTable2 WHERE a = d", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithUnion(): Unit = { util.verifyExplain("SELECT * FROM MyTable1 UNION ALL SELECT * FROM MyTable2", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithSort(): Unit = { util.verifyExplain("SELECT * FROM MyTable1 ORDER BY a LIMIT 5", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithSingleSink(): Unit = { val table = util.tableEnv.sqlQuery("SELECT * FROM MyTable1 WHERE a > 10") val sink = util.createCollectTableSink(Array("a", "b", "c"), Array(INT, LONG, STRING)) util.verifyExplainInsert(table, sink, "sink", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithMultiSinks(): Unit = { val stmtSet = util.tableEnv.createStatementSet() val table = util.tableEnv.sqlQuery("SELECT a, COUNT(*) AS cnt FROM MyTable1 GROUP BY a") @@ -116,7 +116,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { util.verifyExplain(stmtSet, extraDetails: _*) } - @Test + @TestTemplate def testExplainMultipleInput(): Unit = { util.tableEnv.getConfig .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin") @@ -133,7 +133,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { } object ExplainTest { - @Parameterized.Parameters(name = "extended={0}") + @Parameters(name = "extended={0}") def parameters(): java.util.Collection[Boolean] = { java.util.Arrays.asList(true, false) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala index 39d2326e332..8fce44b2dac 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala @@ -23,15 +23,15 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.planner.utils.TableTestBase import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType} +import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} -import org.junit.{Before, Test} -import org.junit.runner.RunWith -import org.junit.runners.Parameterized +import org.junit.jupiter.api.{BeforeEach, TestTemplate} +import org.junit.jupiter.api.extension.ExtendWith import java.sql.Timestamp import java.time.Duration -@RunWith(classOf[Parameterized]) +@ExtendWith(Array(classOf[ParameterizedTestExtension])) class ExplainTest(extended: Boolean) extends TableTestBase { private val extraDetails = if (extended) { @@ -49,55 +49,55 @@ class ExplainTest(extended: Boolean) extends TableTestBase { val LONG = new BigIntType() val INT = new IntType() - @Before + @BeforeEach def before(): Unit = { util.tableEnv.getConfig .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(4)) } - @Test + @TestTemplate def testExplainTableSourceScan(): Unit = { util.verifyExplain("SELECT * FROM MyTable", extraDetails: _*) } - @Test + @TestTemplate def testExplainDataStreamScan(): Unit = { util.verifyExplain("SELECT * FROM MyTable1", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithFilter(): Unit = { util.verifyExplain("SELECT * FROM MyTable1 WHERE mod(a, 2) = 0", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithAgg(): Unit = { util.verifyExplain("SELECT COUNT(*) FROM MyTable1 GROUP BY a", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithJoin(): Unit = { util.verifyExplain("SELECT a, b, c, e, f FROM MyTable1, MyTable2 WHERE a = d", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithUnion(): Unit = { util.verifyExplain("SELECT * FROM MyTable1 UNION ALL SELECT * FROM MyTable2", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithSort(): Unit = { util.verifyExplain("SELECT * FROM MyTable1 ORDER BY a LIMIT 5", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithSingleSink(): Unit = { val table = util.tableEnv.sqlQuery("SELECT * FROM MyTable1 WHERE a > 10") val appendSink = util.createAppendTableSink(Array("a", "b", "c"), Array(INT, LONG, STRING)) util.verifyExplainInsert(table, appendSink, "appendSink", extraDetails: _*) } - @Test + @TestTemplate def testExplainWithMultiSinks(): Unit = { val stmtSet = util.tableEnv.createStatementSet() val table = util.tableEnv.sqlQuery("SELECT a, COUNT(*) AS cnt FROM MyTable1 GROUP BY a") @@ -120,7 +120,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { util.verifyExplain(stmtSet, extraDetails: _*) } - @Test + @TestTemplate def testMiniBatchIntervalInfer(): Unit = { val stmtSet = util.tableEnv.createStatementSet() // Test emit latency propagate among RelNodeBlocks @@ -176,7 +176,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { } object ExplainTest { - @Parameterized.Parameters(name = "extended={0}") + @Parameters(name = "extended={0}") def parameters(): java.util.Collection[Boolean] = { java.util.Arrays.asList(true, false) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala index 60a7e247891..e171ab8c56b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala @@ -24,7 +24,8 @@ import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions. import org.apache.flink.table.planner.utils.TableTestBase import org.apache.flink.types.Row -import org.junit.Test +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.jupiter.api.Test class OverWindowValidationTest extends TableTestBase { @@ -32,7 +33,7 @@ class OverWindowValidationTest extends TableTestBase { streamUtil.addDataStream[(Int, String, Long)]("T1", 'a, 'b, 'c, 'proctime.proctime) /** All aggregates must be computed on the same window. */ - @Test(expected = classOf[TableException]) + @Test def testMultiWindow(): Unit = { val sqlQuery = "SELECT " + @@ -41,16 +42,18 @@ class OverWindowValidationTest extends TableTestBase { "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " + "from T1" - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] + assertThatExceptionOfType(classOf[TableException]) + .isThrownBy(() => streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]) } /** OVER clause is necessary for [[OverAgg0]] window function. */ - @Test(expected = classOf[ValidationException]) + @Test def testInvalidOverAggregation(): Unit = { streamUtil.addFunction("overAgg", new OverAgg0) val sqlQuery = "SELECT overAgg(c, a) FROM MyTable" - streamUtil.tableEnv.sqlQuery(sqlQuery) + assertThatExceptionOfType(classOf[ValidationException]) + .isThrownBy(() => streamUtil.tableEnv.sqlQuery(sqlQuery)) } }
