Repository: flink Updated Branches: refs/heads/master 1e9fff4ab -> f0d543f8c
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala index 3a89de2..1a9be93 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} +import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.junit._ @@ -97,7 +97,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testJoinNonExistingKey(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -111,7 +111,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) .select('c, 'g) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testJoinWithNonMatchingKeyTypes(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -125,7 +125,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) .select('c, 'g).collect() } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testJoinWithAmbiguousFields(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -258,7 +258,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testJoinTablesFromDifferentEnvs(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv1 = TableEnvironment.getTableEnvironment(env) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala index c6a6122..9aed5a7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -104,7 +104,7 @@ class SelectITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testSelectInvalidFieldFields(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -114,7 +114,7 @@ class SelectITCase( .select('a, 'foo) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testSelectAmbiguousRenaming(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -124,7 +124,7 @@ class SelectITCase( .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print() } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testSelectAmbiguousRenaming2(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala index e428306..f7d998b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala @@ -20,8 +20,7 @@ package org.apache.flink.api.scala.batch.table import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.codegen.CodeGenException -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.junit._ @@ -59,7 +58,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testNonWorkingSubstring1(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -71,7 +70,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT t.toDataSet[Row].collect() } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testNonWorkingSubstring2(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala index 7c109a3..29427a5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} +import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -90,7 +90,7 @@ class UnionITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testUnionDifferentFieldNames(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -102,7 +102,7 @@ class UnionITCase( ds1.unionAll(ds2) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testUnionDifferentFieldTypes(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -159,7 +159,7 @@ class UnionITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testUnionTablesFromDifferentEnvs(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv1 = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala index 56daa6b..fe606e0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala @@ -30,7 +30,7 @@ import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.runtime.FunctionCompiler -import org.apache.flink.api.table.{TableConfig, TableEnvironment} +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableEnvironment} import org.mockito.Mockito._ /** @@ -82,8 +82,12 @@ object ExpressionEvaluator { } def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = { - val relBuilder = prepareTable(typeInfo)._2 - evaluate(data, typeInfo, relBuilder, expr.toRexNode(relBuilder)) + val table = prepareTable(typeInfo) + val env = table._3 + val resolvedExpr = + env.asInstanceOf[BatchTableEnvironment].scan("myTable").select(expr). + getRelNode.asInstanceOf[LogicalProject].getChildExps.get(0) + evaluate(data, typeInfo, table._2, resolvedExpr) } def evaluate( http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala index ff0e961..feda75f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala @@ -19,9 +19,9 @@ package org.apache.flink.api.scala.stream.table import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.stream.utils.{StreamTestData, StreamITCase} +import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData} import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Assert._ @@ -70,7 +70,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testUnionFieldsNameNotOverlap1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -88,7 +88,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { assertEquals(true, StreamITCase.testResults.isEmpty) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testUnionFieldsNameNotOverlap2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -108,7 +108,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { assertEquals(true, StreamITCase.testResults.isEmpty) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testUnionTablesFromDifferentEnvs(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv1 = TableEnvironment.getTableEnvironment(env) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala index e6309a2..a382447 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala @@ -50,6 +50,13 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { } @Test(expected = classOf[TableException]) + def testSort(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc) + } + + @Test(expected = classOf[TableException]) def testJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)
