Repository: flink Updated Branches: refs/heads/master aaa6c7aed -> 815bc833f
[FLINK-3942] [tableAPI] Add support for INTERSECT and update document This closes #2159. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/815bc833 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/815bc833 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/815bc833 Branch: refs/heads/master Commit: 815bc833f2c93227c19574119bcf1d00866030b5 Parents: aaa6c7a Author: Jark Wu <wuchong...@alibaba-inc.com> Authored: Fri Jun 24 19:57:09 2016 +0800 Committer: twalthr <twal...@apache.org> Committed: Fri Jul 8 14:52:01 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 50 +++- .../api/table/plan/logical/operators.scala | 38 ++- .../plan/nodes/dataset/DataSetAggregate.scala | 2 +- .../plan/nodes/dataset/DataSetIntersect.scala | 135 +++++++++++ .../table/plan/nodes/dataset/DataSetSort.scala | 1 + .../table/plan/nodes/dataset/DataSetUnion.scala | 10 +- .../api/table/plan/rules/FlinkRuleSets.scala | 3 +- .../rules/dataSet/DataSetIntersectRule.scala | 55 +++++ .../runtime/IntersectCoGroupFunction.scala | 43 ++++ .../org/apache/flink/api/table/table.scala | 56 ++++- .../scala/batch/sql/SetOperatorsITCase.scala | 196 +++++++++++++++ .../flink/api/scala/batch/sql/UnionITCase.scala | 124 ---------- .../scala/batch/table/SetOperatorsITCase.scala | 242 +++++++++++++++++++ .../api/scala/batch/table/UnionITCase.scala | 142 ----------- 14 files changed, 815 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index b411db6..afddee9 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -537,6 +537,30 @@ Table result = left.unionAll(right); </tr> <tr> + <td><strong>Intersect</strong></td> + <td> + <p>Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.</p> +{% highlight java %} +Table left = tableEnv.fromDataSet(ds1, "a, b, c"); +Table right = tableEnv.fromDataSet(ds2, "d, e, f"); +Table result = left.intersect(right); +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>IntersectAll</strong></td> + <td> + <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p> +{% highlight java %} +Table left = tableEnv.fromDataSet(ds1, "a, b, c"); +Table right = tableEnv.fromDataSet(ds2, "d, e, f"); +Table result = left.intersectAll(right); +{% endhighlight %} + </td> + </tr> + + <tr> <td><strong>Distinct</strong></td> <td> <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p> @@ -696,6 +720,30 @@ val result = left.unionAll(right); </tr> <tr> + <td><strong>Intersect</strong></td> + <td> + <p>Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.</p> +{% highlight scala %} +val left = ds1.toTable(tableEnv, 'a, 'b, 'c); +val right = ds2.toTable(tableEnv, 'e, 'f, 'g); +val result = left.intersect(right); +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>IntersectAll</strong></td> + <td> + <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p> +{% highlight scala %} +val left = ds1.toTable(tableEnv, 'a, 'b, 'c); +val right = ds2.toTable(tableEnv, 'e, 'f, 'g); +val result = left.intersectAll(right); +{% endhighlight %} + </td> + </tr> + + <tr> <td><strong>Distinct</strong></td> <td> <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p> @@ -836,7 +884,7 @@ Among others, the following SQL features are not supported, yet: - Non-equi joins and Cartesian products - Result selection by order position (`ORDER BY OFFSET FETCH`) - Grouping sets -- `INTERSECT` and `EXCEPT` set operations +- `EXCEPT` set operation *Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.* http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index 9f57ac9..028983b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -17,11 +17,8 @@ */ package org.apache.flink.api.table.plan.logical -import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rel.logical.LogicalProject import org.apache.calcite.rex.{RexInputRef, RexNode} import org.apache.calcite.tools.RelBuilder @@ -32,6 +29,9 @@ import org.apache.flink.api.table._ import org.apache.flink.api.table.expressions._ import org.apache.flink.api.table.typeutils.TypeConverter +import scala.collection.JavaConverters._ +import scala.collection.mutable + case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) @@ -248,13 +248,13 @@ case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends Bi override def validate(tableEnv: TableEnvironment): LogicalNode = { val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union] if (left.output.length != right.output.length) { - failValidation(s"Union two table of different column sizes:" + + failValidation(s"Union two tables of different column sizes:" + s" ${left.output.size} and ${right.output.size}") } val sameSchema = left.output.zip(right.output).forall { case (l, r) => l.resultType == r.resultType && l.name == r.name } if (!sameSchema) { - failValidation(s"Union two table of different schema:" + + failValidation(s"Union two tables of different schema:" + s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") } @@ -262,6 +262,34 @@ case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends Bi } } +case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode { + override def output: Seq[Attribute] = left.output + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + left.construct(relBuilder) + right.construct(relBuilder) + relBuilder.intersect(all) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect] + if (left.output.length != right.output.length) { + failValidation(s"Intersect two tables of different column sizes:" + + s" ${left.output.size} and ${right.output.size}") + } + // allow different column names between tables + val sameSchema = left.output.zip(right.output).forall { case (l, r) => + l.resultType == r.resultType + } + if (!sameSchema) { + failValidation(s"Intersect two tables of different schema:" + + s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + + s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") + } + resolvedIntersect + } +} + case class Join( left: LogicalNode, right: LogicalNode, http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 910f05c..e71ab6c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -145,7 +145,7 @@ class DataSetAggregate( false, rowTypeInfo.asInstanceOf[TypeInformation[Any]], expectedType.get, - "AggregateOutputConversion", + "DataSetAggregateConversion", rowType.getFieldNames.asScala )) .name(mapName) http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala new file mode 100644 index 0000000..3d88f6b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.runtime.IntersectCoGroupFunction +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which translates Intersect into CoGroup Operator. + * + */ +class DataSetIntersect( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + rowType: RelDataType, + all: Boolean, + ruleDescription: String) + extends BiRel(cluster, traitSet, left, right) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetIntersect( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + rowType, + all, + ruleDescription + ) + } + + override def toString: String = { + s"Intersect(intersect: ($intersectSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("intersect", intersectSelectionToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val children = this.getInputs + children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) + } + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + + val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + + val coGroupedDs = leftDataSet.coGroup(rightDataSet) + + val coGroupOpName = s"intersect: ($intersectSelectionToString)" + val coGroupFunction = new IntersectCoGroupFunction[Any](all) + + val intersectDs = coGroupedDs.where("*").equalTo("*") + .`with`(coGroupFunction).name(coGroupOpName) + + val config = tableEnv.getConfig + val leftType = leftDataSet.getType + + // here we only care about left type information, because we emit records from left DataSet + expectedType match { + case None if config.getEfficientTypeUsage => + intersectDs + + case _ => + val determinedType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + + // conversion + if (determinedType != leftType) { + val mapFunc = getConversionMapper( + config, + false, + leftType, + determinedType, + "DataSetIntersectConversion", + getRowType.getFieldNames) + + val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + + intersectDs.map(mapFunc).name(opName) + } + // no conversion necessary, forward + else { + intersectDs + } + } + } + + private def intersectSelectionToString: String = { + rowType.getFieldNames.asScala.toList.mkString(", ") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala index 23cfbcf..1af03d8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -109,6 +109,7 @@ class DataSetSort( direction match { case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + case _ => throw new IllegalArgumentException("Unsupported direction.") } } http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala index 78f64a4..ff1ff29 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala @@ -18,16 +18,16 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet} +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} +import org.apache.flink.api.table.BatchTableEnvironment -import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * Flink RelNode which matches along with UnionOperator. @@ -55,7 +55,7 @@ class DataSetUnion( } override def toString: String = { - s"Union(union: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))" + s"Union(union: ($unionSelectionToString))" } override def explainTerms(pw: RelWriter): RelWriter = { http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index a2ec08d..68ce354 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -19,12 +19,10 @@ package org.apache.flink.api.table.plan.rules import org.apache.calcite.rel.rules._ -import org.apache.calcite.rel.stream.StreamRules import org.apache.calcite.tools.{RuleSets, RuleSet} import org.apache.flink.api.table.plan.rules.dataSet._ import org.apache.flink.api.table.plan.rules.datastream._ import org.apache.flink.api.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule} -import scala.collection.JavaConversions._ object FlinkRuleSets { @@ -102,6 +100,7 @@ object FlinkRuleSets { DataSetJoinRule.INSTANCE, DataSetScanRule.INSTANCE, DataSetUnionRule.INSTANCE, + DataSetIntersectRule.INSTANCE, DataSetSortRule.INSTANCE, DataSetValuesRule.INSTANCE, BatchTableSourceScanRule.INSTANCE http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala new file mode 100644 index 0000000..f86ec9b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalIntersect +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetIntersect, DataSetConvention} + +class DataSetIntersectRule + extends ConverterRule( + classOf[LogicalIntersect], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetIntersectRule") +{ + + def convert(rel: RelNode): RelNode = { + + val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), DataSetConvention.INSTANCE) + + new DataSetIntersect( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + intersect.all, + description) + } +} + +object DataSetIntersectRule { + val INSTANCE: RelOptRule = new DataSetIntersectRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala new file mode 100644 index 0000000..c39c497 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.functions.CoGroupFunction +import org.apache.flink.util.Collector + + +class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{ + override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = { + if (first == null || second == null) return + val leftIter = first.iterator() + val rightIter = second.iterator() + if (all) { + while (leftIter.hasNext && rightIter.hasNext) { + out.collect(leftIter.next) + rightIter.next + } + } else { + if (leftIter.hasNext && rightIter.hasNext) { + out.collect(leftIter.next) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index e415238..5c4cdf0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -403,7 +403,7 @@ class Table( } /** - * Union two [[Table]]s with duplicate records removed. + * Unions two [[Table]]s with duplicate records removed. * Similar to an SQL UNION. The fields of the two union operations must fully overlap. * * Note: Both tables must be bound to the same [[TableEnvironment]]. @@ -426,7 +426,7 @@ class Table( } /** - * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations + * Unions two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations * must fully overlap. * * Note: Both tables must be bound to the same [[TableEnvironment]]. @@ -446,6 +446,58 @@ class Table( } /** + * Intersects two [[Table]]s with duplicate records removed. Intersect returns records that + * exist in both tables. If a record is present in one or both tables more than once, it is + * returned just once, i.e., the resulting table has no duplicate records. Similar to an + * SQL INTERSECT. The fields of the two intersect operations must fully overlap. + * + * Note: Both tables must be bound to the same [[TableEnvironment]]. + * + * Example: + * + * {{{ + * left.intersect(right) + * }}} + */ + def intersect(right: Table): Table = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Intersect on stream tables is currently not supported.") + } + // check that right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new ValidationException( + "Only tables from the same TableEnvironment can be intersected.") + } + new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv)) + } + + /** + * Intersects two [[Table]]s. IntersectAll returns records that exist in both tables. + * If a record is present in both tables more than once, it is returned as many times as it + * is present in both tables, i.e., the resulting table might have duplicate records. Similar + * to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap. + * + * Note: Both tables must be bound to the same [[TableEnvironment]]. + * + * Example: + * + * {{{ + * left.intersectAll(right) + * }}} + */ + def intersectAll(right: Table): Table = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Intersect on stream tables is currently not supported.") + } + // check that right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new ValidationException( + "Only tables from the same TableEnvironment can be intersected.") + } + new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv)) + } + + /** * Sorts the given [[Table]]. Similar to SQL ORDER BY. * The resulting Table is globally sorted across all parallel partitions. * http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala new file mode 100644 index 0000000..10ada9d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Random + +@RunWith(classOf[Parameterized]) +class SetOperatorsITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testUnionAll(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnion(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnionWithFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM (" + + "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" + + "WHERE b < 2" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + "Hallo\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnionWithAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT count(c) FROM (" + + "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) + + val result = tEnv.sql(sqlQuery) + + val expected = "18" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testIntersect(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM t1 INTERSECT SELECT c FROM t2" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val data = new mutable.MutableList[(Int, Long, String)] + data.+=((1, 1L, "Hi")) + data.+=((2, 2L, "Hello")) + data.+=((2, 2L, "Hello")) + data.+=((3, 2L, "Hello world!")) + val ds2 = env.fromCollection(Random.shuffle(data)) + + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + "Hello\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Ignore + // calcite sql parser doesn't support INTERSECT ALL + def testIntersectAll(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM t1 INTERSECT ALL SELECT c FROM t2" + + val data1 = new mutable.MutableList[Int] + data1 += (1, 1, 1, 2, 2) + val data2 = new mutable.MutableList[Int] + data2 += (1, 2, 2, 3) + val ds1 = env.fromCollection(data1) + val ds2 = env.fromCollection(data2) + + tEnv.registerDataSet("t1", ds1, 'c) + tEnv.registerDataSet("t2", ds2, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "1\n2\n2" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testIntersectWithFilter(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT c FROM ((SELECT * FROM t1) INTERSECT (SELECT * FROM t2)) WHERE a > 1" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get3TupleDataSet(env) + + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hello\n" + "Hello world\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala deleted file mode 100644 index 527eac7..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.sql - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class UnionITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { - - @Test - def testUnionAll(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)" - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) - tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) - tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) - - val result = tEnv.sql(sqlQuery) - - val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnion(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)" - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) - tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) - tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) - - val result = tEnv.sql(sqlQuery) - - val expected = "Hi\n" + "Hello\n" + "Hello world\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnionWithFilter(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT c FROM (" + - "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" + - "WHERE b < 2" - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) - val ds2 = CollectionDataSets.get5TupleDataSet(env) - tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) - tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) - - val result = tEnv.sql(sqlQuery) - - val expected = "Hi\n" + "Hallo\n" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnionWithAggregation(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val sqlQuery = "SELECT count(c) FROM (" + - "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) - val ds2 = CollectionDataSets.get5TupleDataSet(env) - tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) - tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) - - val result = tEnv.sql(sqlQuery) - - val expected = "18" - val results = result.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala new file mode 100644 index 0000000..0c7a09c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Random + +@RunWith(classOf[Parameterized]) +class SetOperatorsITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testUnionAll(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val unionDs = ds1.unionAll(ds2).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnion(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val unionDs = ds1.union(ds2).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTernaryUnionAll(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + + "Hi\n" + "Hello\n" + "Hello world\n" + + "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTernaryUnion(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val unionDs = ds1.union(ds2).union(ds3).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testUnionDifferentFieldNames(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) + + // must fail. Union inputs have different field names. + ds1.unionAll(ds2) + } + + @Test(expected = classOf[ValidationException]) + def testUnionDifferentFieldTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .select('a, 'b, 'c) + + // must fail. Union inputs have different field types. + ds1.unionAll(ds2) + } + + @Test(expected = classOf[ValidationException]) + def testUnionTablesFromDifferentEnvs(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env, config) + val tEnv2 = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) + + // Must fail. Tables are bound to different TableEnvironments. + ds1.unionAll(ds2).select('c) + } + + @Test + def testIntersect(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val data = new mutable.MutableList[(Int, Long, String)] + data.+=((1, 1L, "Hi")) + data.+=((2, 2L, "Hello")) + data.+=((2, 2L, "Hello")) + data.+=((3, 2L, "Hello world!")) + val ds2 = env.fromCollection(Random.shuffle(data)).toTable(tEnv, 'a, 'b, 'c) + + val intersectDS = ds1.intersect(ds2).select('c).toDataSet[Row] + + val results = intersectDS.collect() + + val expected = "Hi\n" + "Hello\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testIntersectAll(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val data1 = new mutable.MutableList[Int] + data1 += (1, 1, 1, 2, 2) + val data2 = new mutable.MutableList[Int] + data2 += (1, 2, 2, 2, 3) + val ds1 = env.fromCollection(data1).toTable(tEnv, 'c) + val ds2 = env.fromCollection(data2).toTable(tEnv, 'c) + + val intersectDS = ds1.intersectAll(ds2).select('c).toDataSet[Row] + + val expected = "1\n2\n2" + val results = intersectDS.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testIntersectWithDifferentFieldNames(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'e, 'f, 'g) + + val intersectDs = ds1.intersect(ds2).select('c) + + val results = intersectDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testIntersectWithDifferentFieldTypes(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .select('a, 'b, 'c) + + // must fail. Intersect inputs have different field types. + ds1.intersect(ds2) + } + + @Test(expected = classOf[ValidationException]) + def testIntersectTablesFromDifferentEnvs(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv1 = TableEnvironment.getTableEnvironment(env, config) + val tEnv2 = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) + + // Must fail. Tables are bound to different TableEnvironments. + ds1.intersect(ds2).select('c) + } + + @Test + def testIntersectWithScalarExpression(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .select('a + 1, 'b, 'c) + val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + .select('a + 1, 'b, 'c) + + val intersectDs = ds1.intersect(ds2) + + val results = intersectDs.toDataSet[Row].collect() + val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala deleted file mode 100644 index f472341..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class UnionITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { - - @Test - def testUnionAll(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val unionDs = ds1.unionAll(ds2).select('c) - - val results = unionDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnion(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val unionDs = ds1.union(ds2).select('c) - - val results = unionDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hello\n" + "Hello world\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testTernaryUnionAll(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c) - - val results = unionDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hello\n" + "Hello world\n" + - "Hi\n" + "Hello\n" + "Hello world\n" + - "Hi\n" + "Hello\n" + "Hello world\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testTernaryUnion(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - - val unionDs = ds1.union(ds2).union(ds3).select('c) - - val results = unionDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hello\n" + "Hello world\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ValidationException]) - def testUnionDifferentFieldNames(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) - - // must fail. Union inputs have different field names. - ds1.unionAll(ds2) - } - - @Test(expected = classOf[ValidationException]) - def testUnionDifferentFieldTypes(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) - .select('a, 'b, 'c) - - // must fail. Union inputs have different field types. - ds1.unionAll(ds2) - } - - @Test(expected = classOf[ValidationException]) - def testUnionTablesFromDifferentEnvs(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv1 = TableEnvironment.getTableEnvironment(env, config) - val tEnv2 = TableEnvironment.getTableEnvironment(env, config) - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) - val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c) - - // Must fail. Tables are bound to different TableEnvironments. - ds1.unionAll(ds2).select('c) - } -}