[FLINK-5084] [table] Replace Java Table API integration tests by unit tests
This closes #2977. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/649cf054 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/649cf054 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/649cf054 Branch: refs/heads/master Commit: 649cf054e692ea7ffba3125377300fa0496908e7 Parents: 614acc3 Author: mtunique <[email protected]> Authored: Fri Dec 9 13:23:36 2016 +0800 Committer: twalthr <[email protected]> Committed: Tue Jan 10 11:54:23 2017 +0100 ---------------------------------------------------------------------- .../flink/table/api/java/batch/ExplainTest.java | 160 -------- .../java/batch/table/AggregationsITCase.java | 380 ------------------ .../table/api/java/batch/table/CalcITCase.java | 324 ---------------- .../api/java/batch/table/CastingITCase.java | 140 ------- .../table/api/java/batch/table/JoinITCase.java | 207 ---------- .../table/api/scala/batch/sql/CalcITCase.scala | 2 +- .../scala/batch/table/AggregationsITCase.scala | 74 +--- .../api/scala/batch/table/CalcITCase.scala | 101 ++--- .../api/scala/batch/table/CastingITCase.scala | 104 +++++ .../api/scala/batch/table/JoinITCase.scala | 113 +----- .../scala/batch/table/SetOperatorsITCase.scala | 92 +---- .../api/scala/batch/table/SortITCase.scala | 11 - .../AggregationsStringExpressionTest.scala | 342 ++++++++++++++++ .../stringexpr/CalcStringExpressionTest.scala | 386 +++++++++++++++++++ .../CastingStringExpressionTest.scala | 121 ++++++ .../stringexpr/JoinStringExpressionTest.scala | 276 +++++++++++++ .../validation/AggregationsValidationTest.scala | 139 +++++++ .../table/validation/CalcValidationTest.scala | 138 +++++++ .../table/validation/JoinValidationTest.scala | 188 +++++++++ .../validation/SetOperatorsValidationTest.scala | 119 ++++++ .../table/validation/SortValidationTest.scala | 47 +++ .../batch/utils/LogicalPlanFormatUtils.scala | 35 ++ 22 files changed, 1931 insertions(+), 1568 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java deleted file mode 100644 index 114579f..0000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.java.batch; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; - -import java.io.File; -import java.util.Scanner; - -import static org.junit.Assert.assertEquals; - -public class ExplainTest extends MultipleProgramsTestBase { - - public ExplainTest() { - super(TestExecutionMode.CLUSTER); - } - - private static String testFilePath = ExplainTest.class.getResource("/").getFile(); - - @Test - public void testFilterWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d")); - Table table = tableEnv - .fromDataSet(input, "a, b") - .filter("a % 2 = 0"); - - String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n"); - try (Scanner scanner = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testFilter0.out"))){ - String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); - assertEquals(source, result); - } - } - - @Test - public void testFilterWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d")); - Table table = tableEnv - .fromDataSet(input, "a, b") - .filter("a % 2 = 0"); - - String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n"); - try (Scanner scanner = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testFilter1.out"))){ - String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); - assertEquals(source, result); - } - } - - @Test - public void testJoinWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d")); - DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d")); - Table table1 = tableEnv.fromDataSet(input1, "a, b"); - Table table2 = tableEnv.fromDataSet(input2, "c, d"); - Table table = table1 - .join(table2) - .where("b = d") - .select("a, c"); - - String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n"); - try (Scanner scanner = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testJoin0.out"))){ - String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); - assertEquals(source, result); - } - } - - @Test - public void testJoinWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d")); - DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d")); - Table table1 = tableEnv.fromDataSet(input1, "a, b"); - Table table2 = tableEnv.fromDataSet(input2, "c, d"); - Table table = table1 - .join(table2) - .where("b = d") - .select("a, c"); - - String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n"); - try (Scanner scanner = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testJoin1.out"))){ - String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); - assertEquals(source, result); - } - } - - @Test - public void testUnionWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d")); - DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d")); - Table table1 = tableEnv.fromDataSet(input1, "count, word"); - Table table2 = tableEnv.fromDataSet(input2, "count, word"); - Table table = table1.unionAll(table2); - - String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n"); - try (Scanner scanner = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testUnion0.out"))){ - String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); - assertEquals(source, result); - } - } - - @Test - public void testUnionWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d")); - DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d")); - Table table1 = tableEnv.fromDataSet(input1, "count, word"); - Table table2 = tableEnv.fromDataSet(input2, "count, word"); - Table table = table1.unionAll(table2); - - String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n"); - try (Scanner scanner = new Scanner(new File(testFilePath + - "../../src/test/scala/resources/testUnion1.out"))){ - String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n"); - assertEquals(source, result); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java deleted file mode 100644 index d37ebb5..0000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api.java.batch.table; - -import java.io.Serializable; -import java.util.List; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.examples.java.WordCountTable.WC; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class AggregationsITCase extends TableProgramsTestBase { - - public AggregationsITCase(TestExecutionMode mode, TableConfigMode configMode){ - super(mode, configMode); - } - - @Test - public void testAggregationTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env)); - - Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "231,1,21,21,11"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testAggregationOnNonExistingField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env)); - - Table result = - table.select("foo.avg"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test - public void testWorkingAggregationDataTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = - env.fromElements( - new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), - new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); - - Table table = tableEnv.fromDataSet(input); - - Table result = - table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,1,1,1.5,1.5,2"; - compareResultAsText(results, expected); - } - - @Test - public void testAggregationWithArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple2<Float, String>> input = - env.fromElements( - new Tuple2<>(1f, "Hello"), - new Tuple2<>(2f, "Ciao")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - table.select("(f0 + 2).avg + 2, f1.count + 5"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "5.5,7"; - compareResultAsText(results, expected); - } - - @Test - public void testAggregationWithTwoCount() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple2<Float, String>> input = - env.fromElements( - new Tuple2<>(1f, "Hello"), - new Tuple2<>(2f, "Ciao")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - table.select("f0.count, f1.count"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testNonWorkingDataTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - // Must fail. Cannot compute SUM aggregate on String field. - table.select("f1.sum"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testNoNestedAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = - // Must fail. Aggregation on aggregation not allowed. - table.select("f0.sum.sum"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testGroupingOnNonExistentField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - tableEnv - .fromDataSet(input, "a, b, c") - // must fail. Field foo is not in input - .groupBy("foo") - .select("a.avg"); - } - - @Test(expected = ValidationException.class) - public void testGroupingInvalidSelection() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - tableEnv - .fromDataSet(input, "a, b, c") - .groupBy("a, b") - // must fail. Field c is not a grouping key or aggregation - .select("c"); - } - - @Test - public void testGroupedAggregate() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .groupBy("b").select("b, a.sum"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; - compareResultAsText(results, expected); - } - - @Test - public void testGroupingKeyForwardIfNotUsed() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .groupBy("b").select("a.sum"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; - compareResultAsText(results, expected); - } - - @Test - public void testGroupNoAggregation() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"; - List<Row> results = ds.collect(); - compareResultAsText(results, expected); - } - - @Test - public void testPojoAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - DataSet<WC> input = env.fromElements( - new WC("Hello", 1), - new WC("Ciao", 1), - new WC("Hello", 1), - new WC("Hola", 1), - new WC("Hola", 1)); - - Table table = tableEnv.fromDataSet(input); - - Table filtered = table - .groupBy("word") - .select("word.count as frequency, word") - .filter("frequency = 2"); - - List<String> result = tableEnv.toDataSet(filtered, WC.class) - .map(new MapFunction<WC, String>() { - public String map(WC value) throws Exception { - return value.word; - } - }).collect(); - String expected = "Hello\n" + "Hola"; - compareResultAsText(result, expected); - } - - @Test - public void testPojoGrouping() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<String, Double, String>> data = env.fromElements( - new Tuple3<>("A", 23.0, "Z"), - new Tuple3<>("A", 24.0, "Y"), - new Tuple3<>("B", 1.0, "Z")); - - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - Table table = tableEnv - .fromDataSet(data, "groupMe, value, name") - .select("groupMe, value, name") - .where("groupMe != 'B'"); - - DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class); - - DataSet<MyPojo> result = myPojos.groupBy("groupMe") - .sortGroup("value", Order.DESCENDING) - .first(1); - - List<MyPojo> resultList = result.collect(); - compareResultAsText(resultList, "A,24.0,Y"); - } - - @Test - public void testDistinct() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table distinct = table.select("b").distinct(); - - DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class); - List<Row> results = ds.collect(); - String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n"; - compareResultAsText(results, expected); - } - - @Test - public void testDistinctAfterAggregate() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env); - - Table table = tableEnv.fromDataSet(input, "a, b, c, d, e"); - - Table distinct = table.groupBy("a, e").select("e").distinct(); - - DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class); - List<Row> results = ds.collect(); - String expected = "1\n" + "2\n" + "3\n"; - compareResultAsText(results, expected); - } - - // -------------------------------------------------------------------------------------------- - - public static class MyPojo implements Serializable { - private static final long serialVersionUID = 8741918940120107213L; - - public String groupMe; - public double value; - public String name; - - public MyPojo() { - // for serialization - } - - public MyPojo(String groupMe, double value, String name) { - this.groupMe = groupMe; - this.value = value; - this.name = name; - } - - @Override - public String toString() { - return groupMe + "," + value + "," + name; - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java deleted file mode 100644 index b1ef563..0000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.java.batch.table; - -import java.util.Arrays; -import java.util.Collection; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.table.api.Table; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class CalcITCase extends TableProgramsTestBase { - - public CalcITCase(TestExecutionMode mode, TableConfigMode configMode){ - super(mode, configMode); - } - - @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") - public static Collection<Object[]> parameters() { - return Arrays.asList(new Object[][] { - { TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() }, - { TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL() } - }); - } - - @Test - public void testSimpleSelectAllWithAs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds, "a,b,c"); - - Table result = in - .select("a, b, c"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + - "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + - "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + - "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } - - @Test - public void testSimpleSelectWithNaming() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds); - - Table result = in - .select("f0 as a, f1 as b") - .select("a, b"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - compareResultAsText(results, expected); - } - - @Test - public void testSimpleSelectRenameAll() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds); - - Table result = in - .select("f0 as a, f1 as b, f2 as c") - .select("a, b"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testSelectInvalidField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - tableEnv.fromDataSet(ds, "a, b, c") - // Must fail. Field foo does not exist - .select("a + 1, foo + 2"); - } - - @Test(expected = ValidationException.class) - public void testSelectAmbiguousFieldNames() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - - tableEnv.fromDataSet(ds, "a, b, c") - // Must fail. Field foo does not exist - .select("a + 1 as foo, b + 2 as foo"); - } - - @Test - public void testSelectStar() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table in = tableEnv.fromDataSet(ds, "a,b,c"); - - Table result = in - .select("*"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + - "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + - "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + - "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } - - @Test - public void testAllRejectingFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("false"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "\n"; - compareResultAsText(results, expected); - } - - @Test - public void testAllPassingFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("true"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + - "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + - "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + - "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } - - @Test - public void testFilterOnIntegerTupleField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter(" a % 2 = 0 "); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + - "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; - compareResultAsText(results, expected); - } - - @Test - public void testNotEquals() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("!( a % 2 <> 0 ) "); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + - "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + - "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; - compareResultAsText(results, expected); - } - - @Test - public void testDisjunctivePreds() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("a < 2 || a > 20"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,Hi\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); - } - - @Test - public void testIntegerBiggerThan128() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello")); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - Table result = table - .filter("a = 300 "); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "300,1,Hello\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testFilterInvalidField() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); - Table table = tableEnv.fromDataSet(input, "a, b, c"); - - table - // Must fail. Field foo does not exist. - .filter("foo = 17"); - } - - public static class OldHashCode extends ScalarFunction { - public int eval(String s) { - return -1; - } - } - - public static class HashCode extends ScalarFunction { - public int eval(String s) { - return s.hashCode(); - } - } - - @Test - public void testUserDefinedScalarFunction() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - tableEnv.registerFunction("hashCode", new OldHashCode()); - tableEnv.registerFunction("hashCode", new HashCode()); - - DataSource<String> input = env.fromElements("a", "b", "c"); - - Table table = tableEnv.fromDataSet(input, "text"); - - Table result = table.select("text.hashCode()"); - - DataSet<Integer> ds = tableEnv.toDataSet(result, Integer.class); - List<Integer> results = ds.collect(); - String expected = "97\n98\n99"; - compareResultAsText(results, expected); - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java deleted file mode 100644 index b1bb6e8..0000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.java.batch.table; - -import java.util.List; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.java.tuple.Tuple6; -import org.apache.flink.api.java.tuple.Tuple8; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class CastingITCase extends TableProgramsTestBase { - - public CastingITCase(TestExecutionMode mode, TableConfigMode configMode){ - super(mode, configMode); - } - - @Test - public void testNumericAutocastInArithmetic() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, Long, Double>> input = - env.fromElements(new Tuple8<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table.select("f0 + 1, f1 +" + - " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 + 1.0d, f7 + f0"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"; - compareResultAsText(results, expected); - } - - @Test - public void testNumericAutocastInComparison() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> input = - env.fromElements( - new Tuple6<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d), - new Tuple6<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d)); - - Table table = - tableEnv.fromDataSet(input, "a,b,c,d,e,f"); - - Table result = table - .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "2,2,2,2,2.0,2.0"; - compareResultAsText(results, expected); - } - - @Test - public void testCasting() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple4<Integer, Double, Long, Boolean>> input = - env.fromElements(new Tuple4<>(1, 0.0, 1L, true)); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table.select( - // * -> String - "f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)," + - // NUMERIC TYPE -> Boolean - "f0.cast(BOOL), f1.cast(BOOL), f2.cast(BOOL)," + - // NUMERIC TYPE -> NUMERIC TYPE - "f0.cast(DOUBLE), f1.cast(INT), f2.cast(SHORT)," + - // Boolean -> NUMERIC TYPE - "f3.cast(DOUBLE)," + - // identity casting - "f0.cast(INT), f1.cast(DOUBLE), f2.cast(LONG), f3.cast(BOOL)"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,0.0,1,true," + - "true,false,true," + - "1.0,0,1," + - "1.0," + - "1,0.0,1,true\n"; - compareResultAsText(results, expected); - } - - @Test - public void testCastFromString() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSource<Tuple3<String, String, String>> input = - env.fromElements(new Tuple3<>("1", "true", "2.0")); - - Table table = - tableEnv.fromDataSet(input); - - Table result = table.select( - "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "1,1,1,1,2.0,2.0,true\n"; - compareResultAsText(results, expected); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java deleted file mode 100644 index a916998..0000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.java.batch.table; - -import java.util.List; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - - -@RunWith(Parameterized.class) -public class JoinITCase extends TableProgramsTestBase { - - public JoinITCase(TestExecutionMode mode, TableConfigMode configMode){ - super(mode, configMode); - } - - @Test - public void testJoin() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("b === e").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; - compareResultAsText(results, expected); - } - - @Test - public void testJoinWithFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("b === e && b < 2").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "Hi,Hallo\n"; - compareResultAsText(results, expected); - } - - @Test - public void testJoinWithJoinFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "Hello world, how are you?,Hallo Welt wie\n" + - "I am fine.,Hallo Welt wie\n"; - compareResultAsText(results, expected); - } - - @Test - public void testJoinWithMultipleKeys() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2).where("a === d && b === h").select("c, g"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + - "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testJoinNonExistingKey() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - // Must fail. Field foo does not exist. - in1.join(in2).where("foo === e").select("c, g"); - } - - @Test(expected = ValidationException.class) - public void testJoinWithNonMatchingKeyTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1.join(in2) - // Must fail. Types of join fields are not compatible (Integer and String) - .where("a === g").select("c, g"); - - tableEnv.toDataSet(result, Row.class).collect(); - } - - @Test(expected = ValidationException.class) - public void testJoinWithAmbiguousFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c"); - - // Must fail. Join input have overlapping field names. - in1.join(in2).where("a === d").select("c, g"); - } - - @Test - public void testJoinWithAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - Table result = in1 - .join(in2).where("a === d").select("g.count"); - - DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); - List<Row> results = ds.collect(); - String expected = "6"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testJoinTablesFromDifferentEnvs() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env); - BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tEnv1.fromDataSet(ds1, "a, b, c"); - Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h"); - - // Must fail. Tables are bound to different TableEnvironments. - in1.join(in2).where("a === d").select("g.count"); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala index f3f554b..a6e5c56 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala @@ -298,7 +298,7 @@ class CalcITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.registerFunction("hashCode", - new org.apache.flink.table.api.java.batch.table.CalcITCase.OldHashCode) + org.apache.flink.table.api.scala.batch.table.OldHashCode) tEnv.registerFunction("hashCode", MyHashCode) val ds = env.fromElements("a", "b", "c") http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala index a98c258..94c2a5c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala @@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.types.Row -import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.examples.scala.WordCountTable.{WC => MyWC} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils @@ -54,17 +54,6 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testAggregationOnNonExistingField(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - // Must fail. Field 'foo does not exist. - .select('foo.avg) - } - @Test def testWorkingAggregationDataTypes(): Unit = { @@ -142,30 +131,6 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testNonWorkingAggregationDataTypes(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements(("Hello", 1)).toTable(tEnv) - // Must fail. Field '_1 is not a numeric type. - .select('_1.sum) - - t.collect() - } - - @Test(expected = classOf[ValidationException]) - def testNoNestedAggregations(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = env.fromElements(("Hello", 1)).toTable(tEnv) - // Must fail. Sum aggregation can not be chained. - .select('_2.sum.sum) - } - @Test def testSQLStyleAggregations(): Unit = { @@ -236,30 +201,6 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testGroupingOnNonExistentField(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. '_foo not a valid field - .groupBy('_foo) - .select('a.avg) - } - - @Test(expected = classOf[ValidationException]) - def testGroupingInvalidSelection(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .groupBy('a, 'b) - // must fail. 'c is not a grouping key or aggregation - .select('c) - } - @Test def testGroupedAggregate(): Unit = { @@ -360,9 +301,9 @@ class AggregationsITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .select('b, 4 as 'four, 'a) - .groupBy('b, 'four) - .select('four, 'a.sum) + .select('b, 4 as 'four, 'a) + .groupBy('b, 'four) + .select('four, 'a.sum) val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n" val results = t.toDataSet[Row].collect() @@ -376,11 +317,11 @@ class AggregationsITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) - .groupBy('e, 'b % 3) - .select('c.min, 'e, 'a.avg, 'd.count) + .groupBy('e, 'b % 3) + .select('c.min, 'e, 'a.avg, 'd.count) val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" + - "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1" + "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1" val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -402,4 +343,3 @@ class AggregationsITCase( } } - http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala index bc4f4bd..164e834 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala @@ -27,11 +27,11 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.types.Row -import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.expressions.Literal +import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils -import org.junit.Assert._ import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -110,36 +110,6 @@ class CalcITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testSelectInvalidFieldFields(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. Field 'foo does not exist - .select('a, 'foo) - } - - @Test(expected = classOf[ValidationException]) - def testSelectAmbiguousRenaming(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. 'a and 'b are both renamed to 'foo - .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print() - } - - @Test(expected = classOf[ValidationException]) - def testSelectAmbiguousRenaming2(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. 'a and 'b are both renamed to 'a - .select('a, 'b as 'a).toDataSet[Row].print() - } - @Test def testSelectStar(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -158,41 +128,6 @@ class CalcITCase( } @Test - def testAliasStarException(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c) - fail("TableException expected") - } catch { - case _: TableException => //ignore - } - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - .select('_1 as '*, '_2 as 'b, '_1 as 'c) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - } - - @Test def testAllRejectingFilter(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -326,17 +261,6 @@ class CalcITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testFilterInvalidFieldName(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - // must fail. Field 'foo does not exist - ds.filter( 'foo === 2 ) - } - @Test def testSimpleCalc(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -425,6 +349,19 @@ class CalcITCase( val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testUserDefinedScalarFunction() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + tableEnv.registerFunction("hashCode", OldHashCode) + tableEnv.registerFunction("hashCode", HashCode) + val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) + val result = table.select("text.hashCode()") + val results = result.toDataSet[Row].collect() + val expected = "97\n98\n99" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } } object CalcITCase { @@ -436,3 +373,11 @@ object CalcITCase { Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava } } + +object HashCode extends ScalarFunction { + def eval(s: String): Int = s.hashCode +} + +object OldHashCode extends ScalarFunction { + def eval(s: String): Int = -1 +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala new file mode 100644 index 0000000..18d3333 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.Types._ +import org.apache.flink.test.util.TestBaseUtils.compareResultAsText +import org.apache.flink.types.Row +import org.junit._ + +import scala.collection.JavaConverters._ + +class CastingITCase { + + @Test + def testNumericAutocastInArithmetic() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + + val table = env.fromElements( + (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv) + .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, + '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1) + + val results = table.toDataSet[Row].collect() + val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1" + compareResultAsText(results.asJava, expected) + } + + @Test + @throws[Exception] + def testNumericAutocastInComparison() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + + val table = env.fromElements( + (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d), + (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d)) + .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f) + .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1) + + val results = table.toDataSet[Row].collect() + val expected: String = "2,2,2,2,2.0,2.0" + compareResultAsText(results.asJava, expected) + } + + @Test + @throws[Exception] + def testCasting() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv) + .select( + // * -> String + '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING), + // NUMERIC TYPE -> Boolean + '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN), + // NUMERIC TYPE -> NUMERIC TYPE + '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT), + // Boolean -> NUMERIC TYPE + '_4.cast(DOUBLE), + // identity casting + '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN)) + + val results = table.toDataSet[Row].collect() + val expected = "1,0.0,1,true," + "true,false,true," + + "1.0,0,1," + "1.0," + "1,0.0,1,true\n" + compareResultAsText(results.asJava, expected) + } + + @Test + @throws[Exception] + def testCastFromString() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv) + .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG), + '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN)) + + val results = table.toDataSet[Row].collect() + val expected = "1,1,1,1,2.0,2.0,true\n" + compareResultAsText(results.asJava, expected) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala index ce16ada..277db4c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala @@ -103,80 +103,12 @@ class JoinITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testJoinNonExistingKey(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - - ds1.join(ds2) - // must fail. Field 'foo does not exist - .where('foo === 'e) - .select('c, 'g) - } - - @Test(expected = classOf[ValidationException]) - def testJoinWithNonMatchingKeyTypes(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - - ds1.join(ds2) - // must fail. Field 'a is Int, and 'g is String - .where('a === 'g) - .select('c, 'g).collect() - } - - @Test(expected = classOf[ValidationException]) - def testJoinWithAmbiguousFields(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c) - - ds1.join(ds2) - // must fail. Both inputs share the same field 'c - .where('a === 'd) - .select('c, 'g) - } - - @Test(expected = classOf[TableException]) - def testNoEqualityJoinPredicate1(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - - ds1.join(ds2) - // must fail. No equality join predicate - .where('d === 'f) - .select('c, 'g).collect() - } - - @Test(expected = classOf[TableException]) - def testNoEqualityJoinPredicate2(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - - ds1.join(ds2) - // must fail. No equality join predicate - .where('a < 'd) - .select('c, 'g).collect() - } - @Test def testJoinWithAggregation(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) + // use different table env in order to let tmp table ids are the same + val tEnv2 = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -262,19 +194,6 @@ class JoinITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testJoinTablesFromDifferentEnvs(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv1 = TableEnvironment.getTableEnvironment(env, config) - val tEnv2 = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h) - - // Must fail. Tables are bound to different TableEnvironments. - ds1.join(ds2).where('b === 'e).select('c, 'g) - } - @Test def testLeftJoinWithMultipleKeys(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment @@ -297,30 +216,6 @@ class JoinITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testNoJoinCondition(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - tEnv.getConfig.setNullCheck(true) - - val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - - val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g) - } - - @Test(expected = classOf[ValidationException]) - def testNoEquiJoin(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - tEnv.getConfig.setNullCheck(true) - - val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - - val joinT = ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g) - } - @Test def testRightJoinWithMultipleKeys(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment @@ -330,7 +225,7 @@ class JoinITCase( val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - val joinT = ds1.rightOuterJoin(ds2, "a = d && b = h").select('c, 'g) + val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g) val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" + "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" + @@ -349,7 +244,7 @@ class JoinITCase( val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - val joinT = ds1.rightOuterJoin(ds2, "a = d && b < h").select('c, 'g) + val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) val expected = "Hello world,BCD\n" val results = joinT.toDataSet[Row].collect() http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala index e369250..4e02a98 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala @@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.types.Row -import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -105,44 +105,6 @@ class SetOperatorsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testUnionDifferentColumnSize(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) - - // must fail. Union inputs have different column size. - ds1.unionAll(ds2) - } - - @Test(expected = classOf[ValidationException]) - def testUnionDifferentFieldTypes(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) - .select('a, 'b, 'c) - - // must fail. Union inputs have different field types. - ds1.unionAll(ds2) - } - - @Test(expected = classOf[ValidationException]) - def testUnionTablesFromDifferentEnvs(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv1 = TableEnvironment.getTableEnvironment(env, config) - val tEnv2 = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) - - // Must fail. Tables are bound to different TableEnvironments. - ds1.unionAll(ds2).select('c) - } - @Test def testMinusAll(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment @@ -178,19 +140,6 @@ class SetOperatorsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testMinusDifferentFieldTypes(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) - .select('a, 'b, 'c) - - // must fail. Minus inputs have different field types. - ds1.minus(ds2) - } - @Test def testMinusDifferentFieldNames(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment @@ -207,19 +156,6 @@ class SetOperatorsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testMinusAllTablesFromDifferentEnvs(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv1 = TableEnvironment.getTableEnvironment(env, config) - val tEnv2 = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) - - // Must fail. Tables are bound to different TableEnvironments. - ds1.minusAll(ds2).select('c) - } - @Test def testIntersect(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment @@ -275,32 +211,6 @@ class SetOperatorsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testIntersectWithDifferentFieldTypes(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) - .select('a, 'b, 'c) - - // must fail. Intersect inputs have different field types. - ds1.intersect(ds2) - } - - @Test(expected = classOf[ValidationException]) - def testIntersectTablesFromDifferentEnvs(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv1 = TableEnvironment.getTableEnvironment(env, config) - val tEnv2 = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) - - // Must fail. Tables are bound to different TableEnvironments. - ds1.intersect(ds2).select('c) - } - @Test def testIntersectWithScalarExpression(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala index 3cbc2c8..2991aaa 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala @@ -172,15 +172,4 @@ class SortITCase( TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } - @Test(expected = classOf[ValidationException]) - def testFetchWithoutOrder(): Unit = { - val env = getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds = CollectionDataSets.get3TupleDataSet(env) - val t = ds.toTable(tEnv).limit(0, 5) - - t.toDataSet[Row].collect() - } - }
