http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java deleted file mode 100644 index 581c8ed..0000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.batch.table; - -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.BatchTableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.table.TableEnvironment; -import org.apache.flink.api.table.ValidationException; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class SelectITCase extends TableProgramsTestBase { - - public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) { - super(mode, configMode); - } - - @Test - public void testSimpleSelectAllWithAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds, "a,b,c"); - - Table result = in - .select("a, b, c"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + - "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + - "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + - "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } - - @Test - public void testSimpleSelectWithNaming() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds); - - Table result = in - .select("f0 as a, f1 as b") - .select("a, b"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - compareResultAsText(results, expected); - } - - @Test - public void testSimpleSelectRenameAll() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds); - - Table result = in - .select("f0 as a, f1 as b, f2 as c") - .select("a, b"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testSelectInvalidField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - tableEnv.fromDataSet(ds, "a, b, c") - // Must fail. Field foo does not exist - .select("a + 1, foo + 2"); - } - - @Test(expected = ValidationException.class) - public void testSelectAmbiguousFieldNames() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - tableEnv.fromDataSet(ds, "a, b, c") - // Must fail. Field foo does not exist - .select("a + 1 as foo, b + 2 as foo"); - } - - @Test - public void testSelectStar() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds, "a,b,c"); - - Table result = in - .select("*"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + - "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + - "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + - "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala index c33e1ef..2d82dbc 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala @@ -18,6 +18,8 @@ package org.apache.flink.api.scala.batch +import java.util + import org.apache.flink.api.scala._ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode @@ -140,4 +142,131 @@ class TableEnvironmentITCase( // Must fail. Table is bound to different TableEnvironment. tEnv2.registerTable("MyTable", t1) } + + @Test + def testToTable(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env) + .toTable(tEnv, 'a, 'b, 'c) + .select('a, 'b, 'c) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testToTableFromCaseClass(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val data = List( + SomeCaseClass("Peter", 28, 4000.00, "Sales"), + SomeCaseClass("Anna", 56, 10000.00, "Engineering"), + SomeCaseClass("Lucy", 42, 6000.00, "HR")) + + val t = env.fromCollection(data) + .toTable(tEnv, 'a, 'b, 'c, 'd) + .select('a, 'b, 'c, 'd) + + val expected: String = + "Peter,28,4000.0,Sales\n" + + "Anna,56,10000.0,Engineering\n" + + "Lucy,42,6000.0,HR\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testToTableFromAndToCaseClass(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val data = List( + SomeCaseClass("Peter", 28, 4000.00, "Sales"), + SomeCaseClass("Anna", 56, 10000.00, "Engineering"), + SomeCaseClass("Lucy", 42, 6000.00, "HR")) + + val t = env.fromCollection(data) + .toTable(tEnv, 'a, 'b, 'c, 'd) + .select('a, 'b, 'c, 'd) + + val expected: String = + "SomeCaseClass(Peter,28,4000.0,Sales)\n" + + "SomeCaseClass(Anna,56,10000.0,Engineering)\n" + + "SomeCaseClass(Lucy,42,6000.0,HR)\n" + val results = t.toDataSet[SomeCaseClass].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[TableException]) + def testToTableWithToFewFields(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + CollectionDataSets.get3TupleDataSet(env) + // Must fail. Number of fields does not match. + .toTable(tEnv, 'a, 'b) + } + + @Test(expected = classOf[TableException]) + def testToTableWithToManyFields(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + CollectionDataSets.get3TupleDataSet(env) + // Must fail. Number of fields does not match. + .toTable(tEnv, 'a, 'b, 'c, 'd) + } + + @Test(expected = classOf[TableException]) + def testToTableWithAmbiguousFields(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + CollectionDataSets.get3TupleDataSet(env) + // Must fail. Field names not unique. + .toTable(tEnv, 'a, 'b, 'b) + } + + @Test(expected = classOf[TableException]) + def testToTableWithNonFieldReference1(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + // Must fail. as() can only have field references + CollectionDataSets.get3TupleDataSet(env) + .toTable(tEnv, 'a + 1, 'b, 'c) + } + + @Test(expected = classOf[TableException]) + def testToTableWithNonFieldReference2(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + // Must fail. as() can only have field references + CollectionDataSets.get3TupleDataSet(env) + .toTable(tEnv, 'a as 'foo, 'b, 'c) + } +} + +object TableEnvironmentITCase { + + @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT), + Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT)).asJava + } +} + +case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) { + def this() { this("", 0, 0.0, "") } } http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala index 407fa4c..d7e99d4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala @@ -20,14 +20,15 @@ package org.apache.flink.api.scala.batch import java.io.File -import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.TableEnvironment import org.apache.flink.api.table.sinks.CsvTableSink -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -35,8 +36,9 @@ import org.junit.runners.Parameterized @RunWith(classOf[Parameterized]) class TableSinkITCase( - mode: TestExecutionMode) - extends MultipleProgramsTestBase(mode) { + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { @Test def testBatchTableSink(): Unit = { @@ -46,7 +48,7 @@ class TableSinkITCase( val path = tmpFile.toURI.toString val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) env.setParallelism(4) val input = CollectionDataSets.get3TupleDataSet(env) http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala index 6fd0d13..08bee72 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala @@ -39,14 +39,16 @@ import org.junit.runners.Parameterized import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class TableSourceITCase(mode: TestExecutionMode) - extends MultipleProgramsTestBase(mode) { +class TableSourceITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { @Test def testBatchTableSourceTableAPI(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.registerTableSource("MyTestTable", new TestBatchTableSource()) val results = tEnv @@ -65,7 +67,7 @@ class TableSourceITCase(mode: TestExecutionMode) def testBatchTableSourceSQL(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.registerTableSource("MyTestTable", new TestBatchTableSource()) val results = tEnv.sql( @@ -100,7 +102,7 @@ class TableSourceITCase(mode: TestExecutionMode) tmpWriter.close() val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val csvTable = new CsvTableSource( tempFile.getAbsolutePath, http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala new file mode 100644 index 0000000..49a97e3 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch.sql + +import java.util + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class CalcITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testSelectStarFromTable(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSelectStarFromDataSet(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSimpleSelectAll(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT a, b, c FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSelectWithNaming(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" + + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT a, foo FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + tEnv.sql(sqlQuery) + } + + @Test + def testAllRejectingFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable WHERE false" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAllPassingFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable WHERE true" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnString(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnInteger(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + + "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testDisjunctivePredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "21,6,Comment#15\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterWithAnd(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" + + "9,4,Comment#3\n" + "17,6,Comment#11\n" + + "19,6,Comment#13\n" + "21,6,Comment#15\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} + +object CalcITCase { + + @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT), + Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala deleted file mode 100644 index cc4da38..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.sql - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class FilterITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { - - @Test - def testAllRejectingFilter(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable WHERE false" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testAllPassingFilter(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable WHERE true" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterOnString(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterOnInteger(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + - "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + - "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testDisjunctivePredicate(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "1,1,Hi\n" + "21,6,Comment#15\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterWithAnd(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" + - "9,4,Comment#3\n" + "17,6,Comment#11\n" + - "19,6,Comment#13\n" + "21,6,Comment#15\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala deleted file mode 100644 index 07b802d..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.sql - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class SelectITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { - - @Test - def testSelectStarFromTable(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + - "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + - "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSelectStarFromDataSet(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT * FROM MyTable" - - val ds = CollectionDataSets.get3TupleDataSet(env) - tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c) - - val result = tEnv.sql(sqlQuery) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + - "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + - "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSimpleSelectAll(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT a, b, c FROM MyTable" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + - "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + - "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSelectWithNaming(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - tEnv.registerTable("MyTable", ds) - - val result = tEnv.sql(sqlQuery) - - val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" - - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ValidationException]) - def testInvalidFields(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT a, foo FROM MyTable" - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - tEnv.sql(sqlQuery) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala index 7c0cdff..16c8ece 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala @@ -19,12 +19,14 @@ package org.apache.flink.api.scala.batch.table import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -32,13 +34,16 @@ import org.junit.runners.Parameterized import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { +class AggregationsITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { @Test def testAggregationTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg) @@ -52,7 +57,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testAggregationOnNonExistingField(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) // Must fail. Field 'foo does not exist. @@ -63,7 +68,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testWorkingAggregationDataTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = env.fromElements( (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), @@ -79,7 +84,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testProjection(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = env.fromElements( (1: Byte, 1: Short), @@ -95,7 +100,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testAggregationWithArithmetic(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv) .select(('_1 + 2).avg + 2, '_2.count + 5) @@ -109,7 +114,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testAggregationWithTwoCount(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv) .select('_1.count, '_2.count) @@ -123,7 +128,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testAggregationAfterProjection(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = env.fromElements( (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), @@ -140,7 +145,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testNonWorkingAggregationDataTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = env.fromElements(("Hello", 1)).toTable(tEnv) // Must fail. Field '_1 is not a numeric type. @@ -153,7 +158,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testNoNestedAggregations(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = env.fromElements(("Hello", 1)).toTable(tEnv) // Must fail. Sum aggregation can not be chained. @@ -164,7 +169,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testSQLStyleAggregations(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) .select( @@ -184,7 +189,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa def testPojoAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val input = env.fromElements( MyWC("hello", 1), @@ -204,5 +209,196 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(mappedResult.asJava, expected) } + @Test + def testDistinct(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val distinct = ds.select('b).distinct() + + val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" + val results = distinct.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testDistinctAfterAggregate(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + val distinct = ds.groupBy('a, 'e).select('e).distinct() + + val expected = "1\n" + "2\n" + "3\n" + val results = distinct.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testGroupingOnNonExistentField(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. '_foo not a valid field + .groupBy('_foo) + .select('a.avg) + } + + @Test(expected = classOf[ValidationException]) + def testGroupingInvalidSelection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .groupBy('a, 'b) + // must fail. 'c is not a grouping key or aggregation + .select('c) + } + + @Test + def testGroupedAggregate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .groupBy('b) + .select('b, 'a.sum) + + val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupingKeyForwardIfNotUsed(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .groupBy('b) + .select('a.sum) + + val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupNoAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env) + .toTable(tEnv, 'a, 'b, 'c) + .groupBy('b) + .select('a.sum as 'd, 'b) + .groupBy('b, 'd) + .select('b) + + val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupedAggregateWithLongKeys(): Unit = { + // This uses very long keys to force serialized comparison. + // With short keys, the normalized key is sufficient. + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = env.fromElements( + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), + ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2)) + .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c) + .groupBy('a, 'b) + .select('c.sum) + + val expected = "10\n" + "8\n" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupedAggregateWithConstant1(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .select('a, 4 as 'four, 'b) + .groupBy('four, 'a) + .select('four, 'b.sum) + + val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" + + "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" + + "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n" + val results = t.toDataSet[Row].collect() + + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupedAggregateWithConstant2(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .select('b, 4 as 'four, 'a) + .groupBy('b, 'four) + .select('four, 'a.sum) + + val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupedAggregateWithExpression(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .groupBy('e, 'b % 3) + .select('c.min, 'e, 'a.avg, 'd.count) + + val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" + + "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testGroupedAggregateWithFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .groupBy('b) + .select('b, 'a.sum) + .where('b === 2) + + val expected = "2,5\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala index d64e414..4ffda87 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala @@ -18,14 +18,18 @@ package org.apache.flink.api.scala.batch.table +import java.util + import org.apache.flink.api.scala._ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils +import org.junit.Assert._ import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -39,6 +43,299 @@ class CalcITCase( extends TableProgramsTestBase(mode, configMode) { @Test + def testSimpleSelectAll(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSimpleSelectAllWithAs(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSimpleSelectWithNaming(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + .select('_1 as 'a, '_2 as 'b, '_1 as 'c) + .select('a, 'b) + + val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSimpleSelectRenameAll(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + .select('_1 as 'a, '_2 as 'b, '_3 as 'c) + .select('a, 'b) + + val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testSelectInvalidFieldFields(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. Field 'foo does not exist + .select('a, 'foo) + } + + @Test(expected = classOf[ValidationException]) + def testSelectAmbiguousRenaming(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. 'a and 'b are both renamed to 'foo + .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print() + } + + @Test(expected = classOf[ValidationException]) + def testSelectAmbiguousRenaming2(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. 'a and 'b are both renamed to 'a + .select('a, 'b as 'a).toDataSet[Row].print() + } + + @Test + def testSelectStar(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAliasStarException(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + .select('_1 as '*, '_2 as 'b, '_1 as 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + } + + @Test + def testAllRejectingFilter(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( Literal(false) ) + + val expected = "\n" + val results = filterDs.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAllPassingFilter(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( Literal(true) ) + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = filterDs.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnStringTupleField(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val filterDs = ds.filter( 'c.like("%world%") ) + + val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + val results = filterDs.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnIntegerTupleField(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( 'a % 2 === 0 ) + + val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + + "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" + val results = filterDs.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNotEquals(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( 'a % 2 !== 0) + val expected = "1,1,Hi\n" + "3,2,Hello world\n" + + "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" + + "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" + + "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testDisjunctivePredicate(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( 'a < 2 || 'a > 20) + val expected = "1,1,Hi\n" + "21,6,Comment#15\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testConsecutiveFilters(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0) + val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" + + "9,4,Comment#3\n" + "17,6,Comment#11\n" + + "19,6,Comment#13\n" + "21,6,Comment#15\n" + val results = filterDs.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterBasicType(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.getStringDataSet(env) + + val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") ) + + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + val results = filterDs.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnCustomType(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.getCustomTypeDataSet(env) + val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's) + .filter( 's.like("%a%") ) + + val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + val results = filterDs.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testFilterInvalidFieldName(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + // must fail. Field 'foo does not exist + ds.filter( 'foo === 2 ) + } + + @Test def testSimpleCalc(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -104,3 +401,13 @@ class CalcITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } } + +object CalcITCase { + + @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT), + Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala deleted file mode 100644 index 55c7944..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test - def testDistinct(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val distinct = ds.select('b).distinct() - - val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" - val results = distinct.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testDistinctAfterAggregate(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) - val distinct = ds.groupBy('a, 'e).select('e).distinct() - - val expected = "1\n" + "2\n" + "3\n" - val results = distinct.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala deleted file mode 100644 index ee0356f..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - - -@RunWith(classOf[Parameterized]) -class FilterITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { - - @Test - def testAllRejectingFilter(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( Literal(false) ) - - val expected = "\n" - val results = filterDs.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testAllPassingFilter(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( Literal(true) ) - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = filterDs.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterOnStringTupleField(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val filterDs = ds.filter( 'c.like("%world%") ) - - val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" - val results = filterDs.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterOnIntegerTupleField(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( 'a % 2 === 0 ) - - val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + - "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + - "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" - val results = filterDs.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testNotEquals(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( 'a % 2 !== 0) - val expected = "1,1,Hi\n" + "3,2,Hello world\n" + - "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" + - "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" + - "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testDisjunctivePredicate(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( 'a < 2 || 'a > 20) - val expected = "1,1,Hi\n" + "21,6,Comment#15\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testConsecutiveFilters(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0) - val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" + - "9,4,Comment#3\n" + "17,6,Comment#11\n" + - "19,6,Comment#13\n" + "21,6,Comment#15\n" - val results = filterDs.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterBasicType(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.getStringDataSet(env) - - val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") ) - - val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" - val results = filterDs.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testFilterOnCustomType(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.getCustomTypeDataSet(env) - val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's) - .filter( 's.like("%a%") ) - - val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" - val results = filterDs.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ValidationException]) - def testFilterInvalidFieldName(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - // must fail. Field 'foo does not exist - ds.filter( 'foo === 2 ) - } - -}