Repository: flink Updated Branches: refs/heads/master 5df0b2759 -> 975339395
[FLINK-3943] [table] Add support for EXCEPT operator This closes #2169. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97533939 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97533939 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97533939 Branch: refs/heads/master Commit: 97533939596c90fab69e2b4000846fc588f2a07b Parents: 5df0b27 Author: Ivan Mushketyk <ivan.mushke...@gmail.com> Authored: Sun Jun 26 15:20:25 2016 +0100 Committer: twalthr <twal...@apache.org> Committed: Mon Jul 11 13:57:49 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 51 ++++++- .../api/table/plan/logical/operators.scala | 31 +++++ .../plan/nodes/dataset/DataSetIntersect.scala | 6 +- .../table/plan/nodes/dataset/DataSetMinus.scala | 134 +++++++++++++++++++ .../api/table/plan/rules/FlinkRuleSets.scala | 1 + .../rules/dataSet/DataSetIntersectRule.scala | 3 +- .../plan/rules/dataSet/DataSetMinusRule.scala | 56 ++++++++ .../runtime/IntersectCoGroupFunction.scala | 1 - .../table/runtime/MinusCoGroupFunction.scala | 47 +++++++ .../org/apache/flink/api/table/table.scala | 50 +++++++ .../scala/batch/sql/SetOperatorsITCase.scala | 67 ++++++++++ .../scala/batch/table/SetOperatorsITCase.scala | 77 +++++++++++ .../scala/stream/table/UnsupportedOpsTest.scala | 11 +- .../api/scala/util/CollectionDataSets.scala | 4 +- 14 files changed, 526 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index afddee9..817a84a 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -561,6 +561,30 @@ Table result = left.intersectAll(right); </tr> <tr> + <td><strong>Minus</strong></td> + <td> + <p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p> +{% highlight java %} +Table left = tableEnv.fromDataSet(ds1, "a, b, c"); +Table right = tableEnv.fromDataSet(ds2, "a, b, c"); +Table result = left.minus(right); +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>MinusAll</strong></td> + <td> + <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p> +{% highlight java %} +Table left = tableEnv.fromDataSet(ds1, "a, b, c"); +Table right = tableEnv.fromDataSet(ds2, "a, b, c"); +Table result = left.minusAll(right); +{% endhighlight %} + </td> + </tr> + + <tr> <td><strong>Distinct</strong></td> <td> <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p> @@ -731,7 +755,7 @@ val result = left.intersect(right); </td> </tr> - <tr> + <tr> <td><strong>IntersectAll</strong></td> <td> <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p> @@ -744,6 +768,30 @@ val result = left.intersectAll(right); </tr> <tr> + <td><strong>Minus</strong></td> + <td> + <p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p> +{% highlight scala %} +val left = ds1.toTable(tableEnv, 'a, 'b, 'c); +val right = ds2.toTable(tableEnv, 'a, 'b, 'c); +val result = left.minus(right); +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>MinusAll</strong></td> + <td> + <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p> +{% highlight scala %} +val left = ds1.toTable(tableEnv, 'a, 'b, 'c); +val right = ds2.toTable(tableEnv, 'a, 'b, 'c); +val result = left.minusAll(right); +{% endhighlight %} + </td> + </tr> + + <tr> <td><strong>Distinct</strong></td> <td> <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p> @@ -884,7 +932,6 @@ Among others, the following SQL features are not supported, yet: - Non-equi joins and Cartesian products - Result selection by order position (`ORDER BY OFFSET FETCH`) - Grouping sets -- `EXCEPT` set operation *Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.* http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index 028983b..70d7724 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -236,6 +236,37 @@ case class Aggregate( } } +case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode { + override def output: Seq[Attribute] = left.output + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + left.construct(relBuilder) + right.construct(relBuilder) + relBuilder.minus(all) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + failValidation(s"Minus on stream tables is currently not supported.") + } + + val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus] + if (left.output.length != right.output.length) { + failValidation(s"Minus two table of different column sizes:" + + s" ${left.output.size} and ${right.output.size}") + } + val sameSchema = left.output.zip(right.output).forall { case (l, r) => + l.resultType == r.resultType + } + if (!sameSchema) { + failValidation(s"Minus two table of different schema:" + + s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + + s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") + } + resolvedMinus + } +} + case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode { override def output: Seq[Attribute] = left.output http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala index 3d88f6b..042c28b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala @@ -41,8 +41,7 @@ class DataSetIntersect( left: RelNode, right: RelNode, rowType: RelDataType, - all: Boolean, - ruleDescription: String) + all: Boolean) extends BiRel(cluster, traitSet, left, right) with DataSetRel { @@ -55,8 +54,7 @@ class DataSetIntersect( inputs.get(0), inputs.get(1), rowType, - all, - ruleDescription + all ) } http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala new file mode 100644 index 0000000..d3a2fe7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.runtime.MinusCoGroupFunction +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which implements set minus operation. + * + */ +class DataSetMinus( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + rowType: RelDataType, + all: Boolean) + extends BiRel(cluster, traitSet, left, right) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetMinus( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + rowType, + all + ) + } + + override def toString: String = { + s"Minus(minus: ($minusSelectionToString}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("minus", minusSelectionToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val children = this.getInputs + children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + + val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + + val coGroupedDs = leftDataSet.coGroup(rightDataSet) + + val coGroupOpName = s"minus: ($minusSelectionToString)" + val coGroupFunction = new MinusCoGroupFunction[Any](all) + + val minusDs = coGroupedDs.where("*").equalTo("*") + .`with`(coGroupFunction).name(coGroupOpName) + + val config = tableEnv.getConfig + val leftType = leftDataSet.getType + + // here we only care about left type information, because we emit records from left DataSet + expectedType match { + case None if config.getEfficientTypeUsage => + minusDs + + case _ => + val determinedType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + + // conversion + if (determinedType != leftType) { + val mapFunc = getConversionMapper( + config, + false, + leftType, + determinedType, + "DataSetMinusConversion", + getRowType.getFieldNames) + + val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + + minusDs.map(mapFunc).name(opName) + } + // no conversion necessary, forward + else { + minusDs + } + } + } + + private def minusSelectionToString: String = { + rowType.getFieldNames.asScala.toList.mkString(", ") + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 68ce354..ddfa578 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -101,6 +101,7 @@ object FlinkRuleSets { DataSetScanRule.INSTANCE, DataSetUnionRule.INSTANCE, DataSetIntersectRule.INSTANCE, + DataSetMinusRule.INSTANCE, DataSetSortRule.INSTANCE, DataSetValuesRule.INSTANCE, BatchTableSourceScanRule.INSTANCE http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala index f86ec9b..c0e3269 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala @@ -45,8 +45,7 @@ class DataSetIntersectRule convLeft, convRight, rel.getRowType, - intersect.all, - description) + intersect.all) } } http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala new file mode 100644 index 0000000..44bead0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalMinus +import org.apache.calcite.rel.rules.UnionToDistinctRule +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus} + +class DataSetMinusRule + extends ConverterRule( + classOf[LogicalMinus], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetMinusRule") +{ + + def convert(rel: RelNode): RelNode = { + + val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE) + + new DataSetMinus( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + minus.all) + } +} + +object DataSetMinusRule { + val INSTANCE: RelOptRule = new DataSetMinusRule +} + http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala index c39c497..9930811 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala @@ -23,7 +23,6 @@ import java.lang.{Iterable => JIterable} import org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.util.Collector - class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{ override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = { if (first == null || second == null) return http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala new file mode 100644 index 0000000..cac4fe6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.CoGroupFunction +import org.apache.flink.util.Collector + +class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] { + override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = { + if (first == null || second == null) return + val leftIter = first.iterator + val rightIter = second.iterator + + if (all) { + while (rightIter.hasNext && leftIter.hasNext) { + leftIter.next() + rightIter.next() + } + + while (leftIter.hasNext) { + out.collect(leftIter.next()) + } + } else { + if (!rightIter.hasNext && leftIter.hasNext) { + out.collect(leftIter.next()) + } + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 5c4cdf0..0acf0f9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.plan.logical import org.apache.flink.api.table.plan.logical._ import org.apache.flink.api.table.sinks.TableSink import org.apache.flink.api.table.typeutils.TypeConverter @@ -403,6 +404,55 @@ class Table( } /** + * Minus of two [[Table]]s with duplicate records removed. + * Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not + * exist in the right table. Duplicate records in the left table are returned + * exactly once, i.e., duplicates are removed. Both tables must have identical field types. + * + * Note: Both tables must be bound to the same [[TableEnvironment]]. + * + * Example: + * + * {{{ + * left.minus(right) + * }}} + */ + def minus(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be " + + "subtracted.") + } + new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = false) + .validate(tableEnv)) + } + + /** + * Minus of two [[Table]]s. Similar to an SQL EXCEPT ALL. + * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in + * the right table. A record that is present n times in the left table and m times + * in the right table is returned (n - m) times, i.e., as many duplicates as are present + * in the right table are removed. Both tables must have identical field types. + * + * Note: Both tables must be bound to the same [[TableEnvironment]]. + * + * Example: + * + * {{{ + * left.minusAll(right) + * }}} + */ + def minusAll(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be " + + "subtracted.") + } + new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = true) + .validate(tableEnv)) + } + + /** * Unions two [[Table]]s with duplicate records removed. * Similar to an SQL UNION. The fields of the two union operations must fully overlap. * http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala index 10ada9d..b25f84a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala @@ -124,6 +124,73 @@ class SetOperatorsITCase( } @Test + def testExcept(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = env.fromElements((1, 1L, "Hi")) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hello\n" + "Hello world\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Ignore + // calcite sql parser doesn't support EXCEPT ALL + def testExceptAll(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM t1 EXCEPT ALL SELECT c FROM t2" + + val data1 = new mutable.MutableList[Int] + data1 += (1, 1, 1, 2, 2) + val data2 = new mutable.MutableList[Int] + data2 += (1, 2, 2, 3) + val ds1 = env.fromCollection(data1) + val ds2 = env.fromCollection(data2) + + tEnv.registerDataSet("t1", ds1, 'c) + tEnv.registerDataSet("t2", ds2, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "1\n1" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testExceptWithFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM (" + + "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" + + "WHERE b < 2" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testIntersect(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala index 0c7a09c..83579e0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala @@ -143,6 +143,83 @@ class SetOperatorsITCase( } @Test + def testMinusAll(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c) + + val minusDs = ds1.unionAll(ds1).unionAll(ds1) + .minusAll(ds2.unionAll(ds2)).select('c) + + val results = minusDs.toDataSet[Row].collect() + val expected = "Hi\n" + + "Hello\n" + "Hello world\n" + + "Hello\n" + "Hello world\n" + + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testMinus(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c) + + val minusDs = ds1.unionAll(ds1).unionAll(ds1) + .minus(ds2.unionAll(ds2)).select('c) + + val results = minusDs.toDataSet[Row].collect() + val expected = "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testMinusDifferentFieldTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .select('a, 'b, 'c) + + // must fail. Minus inputs have different field types. + ds1.minus(ds2) + } + + @Test + def testMinusDifferentFieldNames(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'd, 'e, 'f) + + val minusDs = ds1.unionAll(ds1).unionAll(ds1) + .minus(ds2.unionAll(ds2)).select('c) + + val results = minusDs.toDataSet[Row].collect() + val expected = "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testMinusAllTablesFromDifferentEnvs(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env, config) + val tEnv2 = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) + + // Must fail. Tables are bound to different TableEnvironments. + ds1.minusAll(ds2).select('c) + } + + @Test def testIntersect(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala index 92de6f1..df22f2f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.scala.stream.table import org.apache.flink.api.scala.stream.utils.StreamTestData import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{TableEnvironment, TableException} +import org.apache.flink.api.table.{ValidationException, TableEnvironment, TableException} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Test @@ -73,4 +73,13 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) t1.union(t2) } + + @Test(expected = classOf[ValidationException]) + def testMinus(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + t1.minus(t2) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala index d1b6f52..ec1a810 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala @@ -17,12 +17,10 @@ */ package org.apache.flink.api.scala.util -import org.apache.hadoop.io.IntWritable - import org.apache.flink.api.scala._ +import org.apache.hadoop.io.IntWritable import scala.collection.mutable -import scala.reflect.classTag import scala.util.Random /**