Repository: flink Updated Branches: refs/heads/master 614acc3e7 -> 649cf054e
http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala new file mode 100644 index 0000000..ace53d9 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.stringexpr + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils +import org.junit._ + +class AggregationsStringExpressionTest { + + @Test + def testAggregationTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + + val t1 = t.select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg) + val t2 = t.select("_1.sum, _1.min, _1.max, _1.count, _1.avg") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testWorkingAggregationDataTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv) + + val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) + val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testProjection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = env.fromElements( + (1: Byte, 1: Short), + (2: Byte, 2: Short)).toTable(tEnv) + + val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum) + val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testAggregationWithArithmetic(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv) + + val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5) + val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testAggregationWithTwoCount(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv) + + val t1 = t.select('_1.count, '_2.count) + val t2 = t.select("_1.count, _2.count") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testAggregationAfterProjection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv) + + val t1 = t.select('_1, '_2, '_3) + .select('_1.avg, '_2.sum, '_3.count) + + val t2 = t.select("_1, _2, _3") + .select("_1.avg, _2.sum, _3.count") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testDistinct(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val distinct = ds.select('b).distinct() + val distinct2 = ds.select("b").distinct() + + val lPlan1 = distinct.logicalPlan + val lPlan2 = distinct2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testDistinctAfterAggregate(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + + val distinct = ds.groupBy('a, 'e).select('e).distinct() + val distinct2 = ds.groupBy("a, e").select("e").distinct() + + val lPlan1 = distinct.logicalPlan + val lPlan2 = distinct2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testGroupedAggregate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t.groupBy('b).select('b, 'a.sum) + val t2 = t.groupBy("b").select("b, a.sum") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testGroupingKeyForwardIfNotUsed(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t.groupBy('b).select('a.sum) + val t2 = t.groupBy("b").select("a.sum") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testGroupNoAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t + .groupBy('b) + .select('a.sum as 'd, 'b) + .groupBy('b, 'd) + .select('b) + + val t2 = t + .groupBy("b") + .select("a.sum as d, b") + .groupBy("b, d") + .select("b") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testGroupedAggregateWithConstant1(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t.select('a, 4 as 'four, 'b) + .groupBy('four, 'a) + .select('four, 'b.sum) + + val t2 = t.select("a, 4 as four, b") + .groupBy("four, a") + .select("four, b.sum") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testGroupedAggregateWithConstant2(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t.select('b, 4 as 'four, 'a) + .groupBy('b, 'four) + .select('four, 'a.sum) + val t2 = t.select("b, 4 as four, a") + .groupBy("b, four") + .select("four, a.sum") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testGroupedAggregateWithExpression(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + + val t1 = t.groupBy('e, 'b % 3) + .select('c.min, 'e, 'a.avg, 'd.count) + val t2 = t.groupBy("e, b % 3") + .select("c.min, e, a.avg, d.count") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testGroupedAggregateWithFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t.groupBy('b) + .select('b, 'a.sum) + .where('b === 2) + val t2 = t.groupBy("b") + .select("b, a.sum") + .where("b = 2") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala new file mode 100644 index 0000000..a5a5241 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.stringexpr + +import java.sql.{Date, Time, Timestamp} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils +import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.expressions.Literal +import org.junit._ + +class CalcStringExpressionTest { + + @Test + def testSimpleSelectAllWithAs(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t.select('a, 'b, 'c) + val t2 = t.select("a, b, c") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testSimpleSelectWithNaming(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + + val t1 = t + .select('_1 as 'a, '_2 as 'b, '_1 as 'c) + .select('a, 'b) + + val t2 = t + .select("_1 as a, _2 as b, _1 as c") + .select("a, b") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testSimpleSelectRenameAll(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + + val t1 = t + .select('_1 as 'a, '_2 as 'b, '_3 as 'c) + .select('a, 'b) + + val t2 = t + .select("_1 as a, _2 as b, _3 as c") + .select("a, b") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testSelectStar(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = t.select('*) + val t2 = t.select("*") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testAllRejectingFilter(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = ds.filter( Literal(false) ) + val t2 = ds.filter("false") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testAllPassingFilter(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = ds.filter( Literal(true) ) + val t2 = ds.filter("true") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testFilterOnStringTupleField(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = ds.filter( 'c.like("%world%") ) + val t2 = ds.filter("c.like('%world%')") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testFilterOnIntegerTupleField(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = ds.filter( 'a % 2 === 0 ) + val t2 = ds.filter( "a % 2 = 0 ") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testNotEquals(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = ds.filter( 'a % 2 !== 0 ) + val t2 = ds.filter("a % 2 <> 0") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testDisjunctivePredicate(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = ds.filter( 'a < 2 || 'a > 20) + val t2 = ds.filter("a < 2 || a > 20") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testConsecutiveFilters(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0) + val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testFilterBasicType(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.getStringDataSet(env).toTable(tEnv, 'a) + + val t1 = ds.filter( 'a.like("H%") ) + val t2 = ds.filter( "a.like('H%')" ) + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testFilterOnCustomType(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.getCustomTypeDataSet(env) + val t = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's) + + val t1 = t.filter( 's.like("%a%") ) + val t2 = t.filter("s.like('%a%')") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testSimpleCalc(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + + val t1 = t.select('_1, '_2, '_3) + .where('_1 < 7) + .select('_1, '_3) + + val t2 = t.select("_1, _2, _3") + .where("_1 < 7") + .select("_1, _3") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testCalcWithTwoFilters(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + + val t1 = t.select('_1, '_2, '_3) + .where('_1 < 7 && '_2 === 3) + .select('_1, '_3) + .where('_1 === 4) + .select('_1) + + val t2 = t.select("_1, _2, _3") + .where("_1 < 7 && _2 = 3") + .select("_1, _3") + .where("_1 === 4") + .select("_1") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testCalcWithAggregation(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + + val t1 = t.select('_1, '_2, '_3) + .where('_1 < 15) + .groupBy('_2) + .select('_1.min, '_2.count as 'cnt) + .where('cnt > 3) + + + val t2 = t.select("_1, _2, _3") + .where("_1 < 15") + .groupBy("_2") + .select("_1.min, _2.count as cnt") + .where("cnt > 3") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testCalcJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f) + .where('b > 1).select('a, 'd).where('d === 2) + val t2 = ds1.select("a, b").join(ds2).where("b = e").select("a, b, d, e, f") + .where("b > 1").select("a, d").where("d = 2") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testAdvancedDataTypes(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = env + .fromElements(( + BigDecimal("78.454654654654654").bigDecimal, + BigDecimal("4E+9999").bigDecimal, + Date.valueOf("1984-07-12"), + Time.valueOf("14:34:24"), + Timestamp.valueOf("1984-07-12 14:34:24"))) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + + val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal, + "1984-07-12".cast(Types.DATE), "14:34:24".cast(Types.TIME), + "1984-07-12 14:34:24".cast(Types.TIMESTAMP)) + val t2 = t.select("a, b, c, d, e, 11.2, 11.2," + + "'1984-07-12'.toDate, '14:34:24'.toTime," + + "'1984-07-12 14:34:24'.toTimestamp") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1.toString, lPlan2.toString) + } + + @Test + def testIntegerBiggerThan128(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val t = env.fromElements((300, 1L, "Hello")).toTable(tableEnv, 'a, 'b, 'c) + + val t1 = t.filter('a === 300) + val t2 = t.filter("a = 300") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala new file mode 100644 index 0000000..19d27fe --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.stringexpr + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.api.scala._ +import org.junit._ + +class CastingStringExpressionTest { + + @Test + def testNumericAutocastInArithmetic() { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + + val table = env.fromElements( + (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv) + val t1 = table.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, + '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1) + val t2 = table.select("_1 + 1, _2 +" + + " 1, _3 + 1L, _4 + 1.0f, _5 + 1.0d, _6 + 1, _7 + 1.0d, _8 + _1") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + @throws[Exception] + def testNumericAutocastInComparison() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + + val table = env.fromElements( + (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d), + (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d)) + .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f) + val t1 = table.filter('a > 1 && 'b > 1 && 'c > 1L && + 'd > 1.0f && 'e > 1.0d && 'f > 1) + val t2 = table + .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + @throws[Exception] + def testCasting() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv) + val t1 = table .select( + // * -> String + '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING), + // NUMERIC TYPE -> Boolean + '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN), + // NUMERIC TYPE -> NUMERIC TYPE + '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT), + // Boolean -> NUMERIC TYPE + '_4.cast(DOUBLE), // identity casting + '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN)) + val t2 = table.select( + // * -> String + "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," + + // NUMERIC TYPE -> Boolean + "_1.cast(BOOL), _2.cast(BOOL), _3.cast(BOOL)," + + // NUMERIC TYPE -> NUMERIC TYPE + "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," + + // Boolean -> NUMERIC TYPE + "_4.cast(DOUBLE)," + + // identity casting + "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOL)") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + @throws[Exception] + def testCastFromString() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv) + val t1 = table .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG), + '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN)) + val t2 = table.select( + "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " + + "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOL)") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala new file mode 100644 index 0000000..025cda9 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.stringexpr + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils +import org.apache.flink.table.expressions.Literal +import org.junit._ + +class JoinStringExpressionTest { + + @Test + def testJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g) + val t1Java = ds1.join(ds2).where("b === e").select("c, g") + + val lPlan1 = t1Scala.logicalPlan + val lPlan2 = t1Java.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testJoinWithFilter(): Unit = { + + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + + val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g) + val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g") + + val lPlan1 = t1Scala.logicalPlan + val lPlan2 = t1Java.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testJoinWithJoinFilter(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g) + val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g") + + val lPlan1 = t1Scala.logicalPlan + val lPlan2 = t1Java.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testJoinWithMultipleKeys(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g) + val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g") + + val lPlan1 = t1Scala.logicalPlan + val lPlan2 = t1Java.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testJoinWithAggregation(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count) + val t1Java = ds1.join(ds2).where("a === d").select("g.count") + + val lPlan1 = t1Scala.logicalPlan + val lPlan2 = t1Java.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testJoinWithGroupedAggregation(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.join(ds2) + .where('a === 'd) + .groupBy('a, 'd) + .select('b.sum, 'g.count) + val t2 = ds1.join(ds2) + .where("a = d") + .groupBy("a, d") + .select("b.sum, g.count") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", + LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString), + LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)) + } + + @Test + def testJoinPushThroughJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l) + + val t1 = ds1.join(ds2) + .where(Literal(true)) + .join(ds3) + .where('a === 'd && 'e === 'k) + .select('a, 'f, 'l) + val t2 = ds1.join(ds2) + .where("true") + .join(ds3) + .where("a === d && e === k") + .select("a, f, l") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testJoinWithDisjunctivePred(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g) + val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testJoinWithExpressionPreds(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g) + val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testLeftJoinWithMultipleKeys(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g) + val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testRightJoinWithMultipleKeys(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g) + val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testRightJoinWithNotOnlyEquiJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) + val t2 = ds1.rightOuterJoin(ds2, "a = d && b < h").select("c, g") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + + @Test + def testFullOuterJoinWithMultipleKeys(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g) + val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g") + + val lPlan1 = t1.logicalPlan + val lPlan2 = t2.logicalPlan + + Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala new file mode 100644 index 0000000..5cf3f84 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.junit._ + +class AggregationsValidationTest { + + @Test(expected = classOf[ValidationException]) + def testNonWorkingAggregationDataTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + env.fromElements(("Hello", 1)).toTable(tEnv) + // Must fail. Field '_1 is not a numeric type. + .select('_1.sum) + } + + @Test(expected = classOf[ValidationException]) + def testNoNestedAggregations(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + env.fromElements(("Hello", 1)).toTable(tEnv) + // Must fail. Sum aggregation can not be chained. + .select('_2.sum.sum) + } + + @Test(expected = classOf[ValidationException]) + def testGroupingOnNonExistentField(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. '_foo not a valid field + .groupBy('_foo) + .select('a.avg) + } + + @Test(expected = classOf[ValidationException]) + def testGroupingInvalidSelection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .groupBy('a, 'b) + // must fail. 'c is not a grouping key or aggregation + .select('c) + } + + @Test(expected = classOf[ValidationException]) + def testAggregationOnNonExistingField(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + // Must fail. Field 'foo does not exist. + .select('foo.avg) + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testAggregationOnNonExistingFieldJava() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv) + table.select("foo.avg") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testNonWorkingAggregationDataTypesJava() { + val env= ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = env.fromElements((1f, "Hello")).toTable(tableEnv) + // Must fail. Cannot compute SUM aggregate on String field. + table.select("f1.sum") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testNoNestedAggregationsJava() { + val env= ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = env.fromElements((1f, "Hello")).toTable(tableEnv) + // Must fail. Aggregation on aggregation not allowed. + table.select("f0.sum.sum") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testGroupingOnNonExistentFieldJava() { + val env= ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c) + input + // must fail. Field foo is not in input + .groupBy("foo") + .select("a.avg") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testGroupingInvalidSelectionJava() { + val env= ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c) + input + .groupBy("a, b") + // must fail. Field c is not a grouping key or aggregation + .select("c") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala new file mode 100644 index 0000000..846585b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +class CalcValidationTest { + + @Test(expected = classOf[ValidationException]) + def testSelectInvalidFieldFields(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. Field 'foo does not exist + .select('a, 'foo) + } + + @Test(expected = classOf[ValidationException]) + def testSelectAmbiguousRenaming(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. 'a and 'b are both renamed to 'foo + .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print() + } + + @Test(expected = classOf[ValidationException]) + def testSelectAmbiguousRenaming2(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + // must fail. 'a and 'b are both renamed to 'a + .select('a, 'b as 'a).toDataSet[Row].print() + } + + @Test(expected = classOf[ValidationException]) + def testFilterInvalidFieldName(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + // must fail. Field 'foo does not exist + ds.filter( 'foo === 2 ) + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testSelectInvalidField() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c) + + // Must fail. Field foo does not exist + ds.select("a + 1, foo + 2") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testSelectAmbiguousFieldNames() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c) + + // Must fail. Field foo does not exist + ds.select("a + 1 as foo, b + 2 as foo") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testFilterInvalidField() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c) + + // Must fail. Field foo does not exist. + table.filter("foo = 17") + } + + @Test + def testAliasStarException(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c) + fail("TableException expected") + } catch { + case _: TableException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + .select('_1 as '*, '_2 as 'b, '_1 as 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } +try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala new file mode 100644 index 0000000..4837fcc --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException} +import org.junit._ + +class JoinValidationTest { + + @Test(expected = classOf[ValidationException]) + def testJoinNonExistingKey(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + ds1.join(ds2) + // must fail. Field 'foo does not exist + .where('foo === 'e) + .select('c, 'g) + } + + @Test(expected = classOf[ValidationException]) + def testJoinWithNonMatchingKeyTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + ds1.join(ds2) + // must fail. Field 'a is Int, and 'g is String + .where('a === 'g) + .select('c, 'g).collect() + } + + @Test(expected = classOf[ValidationException]) + def testJoinWithAmbiguousFields(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c) + + ds1.join(ds2) + // must fail. Both inputs share the same field 'c + .where('a === 'd) + .select('c, 'g) + } + + @Test(expected = classOf[TableException]) + def testNoEqualityJoinPredicate1(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + ds1.join(ds2) + // must fail. No equality join predicate + .where('d === 'f) + .select('c, 'g).collect() + } + + @Test(expected = classOf[TableException]) + def testNoEqualityJoinPredicate2(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + ds1.join(ds2) + // must fail. No equality join predicate + .where('a < 'd) + .select('c, 'g).collect() + } + + @Test(expected = classOf[ValidationException]) + def testJoinTablesFromDifferentEnvs(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env) + val tEnv2 = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h) + + // Must fail. Tables are bound to different TableEnvironments. + ds1.join(ds2).where('b === 'e).select('c, 'g) + } + + @Test(expected = classOf[ValidationException]) + def testNoJoinCondition(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g) + } + + @Test(expected = classOf[ValidationException]) + def testNoEquiJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g) + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testJoinNonExistingKeyJava() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) + val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'h) + // Must fail. Field foo does not exist. + in1.join(in2).where("foo === e").select("c, g") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testJoinWithNonMatchingKeyTypesJava() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) + val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c) + in1.join(in2) + // Must fail. Types of join fields are not compatible (Integer and String) + .where("a === g").select("c, g") + } + + @Test(expected = classOf[ValidationException]) + @throws[Exception] + def testJoinWithAmbiguousFieldsJava() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env) + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) + val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c) + // Must fail. Join input have overlapping field names. + in1.join(in2).where("a === d").select("c, g") + } + + @Test(expected = classOf[ValidationException]) + def testJoinTablesFromDifferentEnvsJava() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env) + val tEnv2 = TableEnvironment.getTableEnvironment(env) + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c) + val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c) + // Must fail. Tables are bound to different TableEnvironments. + in1.join(in2).where("a === d").select("g.count") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala new file mode 100644 index 0000000..baac9ea --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.junit._ + +class SetOperatorsValidationTest { + + @Test(expected = classOf[ValidationException]) + def testUnionDifferentColumnSize(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) + + // must fail. Union inputs have different column size. + ds1.unionAll(ds2) + } + + @Test(expected = classOf[ValidationException]) + def testUnionDifferentFieldTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .select('a, 'b, 'c) + + // must fail. Union inputs have different field types. + ds1.unionAll(ds2) + } + + @Test(expected = classOf[ValidationException]) + def testUnionTablesFromDifferentEnvs(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env) + val tEnv2 = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) + + // Must fail. Tables are bound to different TableEnvironments. + ds1.unionAll(ds2).select('c) + } + + @Test(expected = classOf[ValidationException]) + def testMinusDifferentFieldTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .select('a, 'b, 'c) + + // must fail. Minus inputs have different field types. + ds1.minus(ds2) + } + + @Test(expected = classOf[ValidationException]) + def testMinusAllTablesFromDifferentEnvs(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env) + val tEnv2 = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) + + // Must fail. Tables are bound to different TableEnvironments. + ds1.minusAll(ds2).select('c) + } + + @Test(expected = classOf[ValidationException]) + def testIntersectWithDifferentFieldTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .select('a, 'b, 'c) + + // must fail. Intersect inputs have different field types. + ds1.intersect(ds2) + } + + @Test(expected = classOf[ValidationException]) + def testIntersectTablesFromDifferentEnvs(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env) + val tEnv2 = TableEnvironment.getTableEnvironment(env) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) + + // Must fail. Tables are bound to different TableEnvironments. + ds1.intersect(ds2).select('c) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala new file mode 100644 index 0000000..4188f51 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table.validation + +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.types.Row +import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.junit._ + +class SortValidationTest { + + def getExecutionEnvironment = { + val env = ExecutionEnvironment.getExecutionEnvironment + env.setParallelism(4) + env + } + + @Test(expected = classOf[ValidationException]) + def testFetchWithoutOrder(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).limit(0, 5) + + t.toDataSet[Row].collect() + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala new file mode 100644 index 0000000..435d6b9 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.utils + +object LogicalPlanFormatUtils { + private val tempPattern = """TMP_\d+""".r + + def formatTempTableId(preStr: String): String = { + val str = preStr.replaceAll("ArrayBuffer\\(", "List\\(") + val minId = getMinTempTableId(str) + tempPattern.replaceAllIn(str, s => "TMP_" + (s.matched.substring(4).toInt - minId) ) + } + + private def getMinTempTableId(logicalStr: String): Long = { + tempPattern.findAllIn(logicalStr).map(s => { + s.substring(4).toInt + }).min + } +}
