Repository: flink
Updated Branches:
  refs/heads/master aaa6c7aed -> 815bc833f


[FLINK-3942] [tableAPI] Add support for INTERSECT and update document

This closes #2159.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/815bc833
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/815bc833
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/815bc833

Branch: refs/heads/master
Commit: 815bc833f2c93227c19574119bcf1d00866030b5
Parents: aaa6c7a
Author: Jark Wu <wuchong...@alibaba-inc.com>
Authored: Fri Jun 24 19:57:09 2016 +0800
Committer: twalthr <twal...@apache.org>
Committed: Fri Jul 8 14:52:01 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              |  50 +++-
 .../api/table/plan/logical/operators.scala      |  38 ++-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   2 +-
 .../plan/nodes/dataset/DataSetIntersect.scala   | 135 +++++++++++
 .../table/plan/nodes/dataset/DataSetSort.scala  |   1 +
 .../table/plan/nodes/dataset/DataSetUnion.scala |  10 +-
 .../api/table/plan/rules/FlinkRuleSets.scala    |   3 +-
 .../rules/dataSet/DataSetIntersectRule.scala    |  55 +++++
 .../runtime/IntersectCoGroupFunction.scala      |  43 ++++
 .../org/apache/flink/api/table/table.scala      |  56 ++++-
 .../scala/batch/sql/SetOperatorsITCase.scala    | 196 +++++++++++++++
 .../flink/api/scala/batch/sql/UnionITCase.scala | 124 ----------
 .../scala/batch/table/SetOperatorsITCase.scala  | 242 +++++++++++++++++++
 .../api/scala/batch/table/UnionITCase.scala     | 142 -----------
 14 files changed, 815 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index b411db6..afddee9 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -537,6 +537,30 @@ Table result = left.unionAll(right);
     </tr>
 
     <tr>
+      <td><strong>Intersect</strong></td>
+      <td>
+        <p>Similar to a SQL INTERSECT clause. Intersect returns records that 
exist in both tables. If a record is present one or both tables more than once, 
it is returned just once, i.e., the resulting table has no duplicate records. 
Both tables must have identical field types.</p>
+{% highlight java %}
+Table left = tableEnv.fromDataSet(ds1, "a, b, c");
+Table right = tableEnv.fromDataSet(ds2, "d, e, f");
+Table result = left.intersect(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>IntersectAll</strong></td>
+      <td>
+        <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records 
that exist in both tables. If a record is present in both tables more than 
once, it is returned as many times as it is present in both tables, i.e., the 
resulting table might have duplicate records. Both tables must have identical 
field types.</p>
+{% highlight java %}
+Table left = tableEnv.fromDataSet(ds1, "a, b, c");
+Table right = tableEnv.fromDataSet(ds2, "d, e, f");
+Table result = left.intersectAll(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
       <td><strong>Distinct</strong></td>
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value 
combinations.</p>
@@ -696,6 +720,30 @@ val result = left.unionAll(right);
     </tr>
 
     <tr>
+      <td><strong>Intersect</strong></td>
+      <td>
+        <p>Similar to a SQL INTERSECT clause. Intersect returns records that 
exist in both tables. If a record is present in one or both tables more than 
once, it is returned just once, i.e., the resulting table has no duplicate 
records. Both tables must have identical field types.</p>
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
+val right = ds2.toTable(tableEnv, 'e, 'f, 'g);
+val result = left.intersect(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+       <tr>
+      <td><strong>IntersectAll</strong></td>
+      <td>
+        <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records 
that exist in both tables. If a record is present in both tables more than 
once, it is returned as many times as it is present in both tables, i.e., the 
resulting table might have duplicate records. Both tables must have identical 
field types.</p>
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
+val right = ds2.toTable(tableEnv, 'e, 'f, 'g);
+val result = left.intersectAll(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
       <td><strong>Distinct</strong></td>
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value 
combinations.</p>
@@ -836,7 +884,7 @@ Among others, the following SQL features are not supported, 
yet:
 - Non-equi joins and Cartesian products
 - Result selection by order position (`ORDER BY OFFSET FETCH`)
 - Grouping sets
-- `INTERSECT` and `EXCEPT` set operations
+- `EXCEPT` set operation
 
 *Note: Tables are joined in the order in which they are specified in the 
`FROM` clause. In some cases the table order must be manually tweaked to 
resolve Cartesian products.*
 

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 9f57ac9..028983b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -17,11 +17,8 @@
  */
 package org.apache.flink.api.table.plan.logical
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.logical.LogicalProject
 import org.apache.calcite.rex.{RexInputRef, RexNode}
 import org.apache.calcite.tools.RelBuilder
@@ -32,6 +29,9 @@ import org.apache.flink.api.table._
 import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.typeutils.TypeConverter
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
 case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
@@ -248,13 +248,13 @@ case class Union(left: LogicalNode, right: LogicalNode, 
all: Boolean) extends Bi
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
     if (left.output.length != right.output.length) {
-      failValidation(s"Union two table of different column sizes:" +
+      failValidation(s"Union two tables of different column sizes:" +
         s" ${left.output.size} and ${right.output.size}")
     }
     val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
       l.resultType == r.resultType && l.name == r.name }
     if (!sameSchema) {
-      failValidation(s"Union two table of different schema:" +
+      failValidation(s"Union two tables of different schema:" +
         s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] 
and" +
         s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
     }
@@ -262,6 +262,34 @@ case class Union(left: LogicalNode, right: LogicalNode, 
all: Boolean) extends Bi
   }
 }
 
+case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) 
extends BinaryNode {
+  override def output: Seq[Attribute] = left.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    left.construct(relBuilder)
+    right.construct(relBuilder)
+    relBuilder.intersect(all)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
+    if (left.output.length != right.output.length) {
+      failValidation(s"Intersect two tables of different column sizes:" +
+        s" ${left.output.size} and ${right.output.size}")
+    }
+    // allow different column names between tables
+    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+      l.resultType == r.resultType
+    }
+    if (!sameSchema) {
+      failValidation(s"Intersect two tables of different schema:" +
+        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] 
and" +
+        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
+    }
+    resolvedIntersect
+  }
+}
+
 case class Join(
     left: LogicalNode,
     right: LogicalNode,

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 910f05c..e71ab6c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -145,7 +145,7 @@ class DataSetAggregate(
           false,
           rowTypeInfo.asInstanceOf[TypeInformation[Any]],
           expectedType.get,
-          "AggregateOutputConversion",
+          "DataSetAggregateConversion",
           rowType.getFieldNames.asScala
         ))
         .name(mapName)

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
new file mode 100644
index 0000000..3d88f6b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.runtime.IntersectCoGroupFunction
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which translates Intersect into CoGroup Operator.
+  *
+  */
+class DataSetIntersect(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    rowType: RelDataType,
+    all: Boolean,
+    ruleDescription: String)
+  extends BiRel(cluster, traitSet, left, right)
+    with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetIntersect(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      rowType,
+      all,
+      ruleDescription
+    )
+  }
+
+  override def toString: String = {
+    s"Intersect(intersect: ($intersectSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("intersect", intersectSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+    val children = this.getInputs
+    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, 
child) =>
+      val rowCnt = metadata.getRowCount(child)
+      val rowSize = this.estimateRowSize(child.getRowType)
+      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+    }
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val leftDataSet: DataSet[Any] = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet: DataSet[Any] = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+    val coGroupOpName = s"intersect: ($intersectSelectionToString)"
+    val coGroupFunction = new IntersectCoGroupFunction[Any](all)
+
+    val intersectDs = coGroupedDs.where("*").equalTo("*")
+      .`with`(coGroupFunction).name(coGroupOpName)
+
+    val config = tableEnv.getConfig
+    val leftType = leftDataSet.getType
+
+    // here we only care about left type information, because we emit records 
from left DataSet
+    expectedType match {
+      case None if config.getEfficientTypeUsage =>
+        intersectDs
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != leftType) {
+          val mapFunc = getConversionMapper(
+            config,
+            false,
+            leftType,
+            determinedType,
+            "DataSetIntersectConversion",
+            getRowType.getFieldNames)
+
+          val opName = s"convert: 
(${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          intersectDs.map(mapFunc).name(opName)
+        }
+        // no conversion necessary, forward
+        else {
+          intersectDs
+        }
+    }
+  }
+
+  private def intersectSelectionToString: String = {
+    rowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index 23cfbcf..1af03d8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -109,6 +109,7 @@ class DataSetSort(
     direction match {
       case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => 
Order.ASCENDING
       case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => 
Order.DESCENDING
+      case _ => throw new IllegalArgumentException("Unsupported direction.")
     }
 
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index 78f64a4..ff1ff29 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -18,16 +18,16 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.api.table.BatchTableEnvironment
 
-import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
 * Flink RelNode which matches along with UnionOperator.
@@ -55,7 +55,7 @@ class DataSetUnion(
   }
 
   override def toString: String = {
-    s"Union(union: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
+    s"Union(union: ($unionSelectionToString))"
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index a2ec08d..68ce354 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -19,12 +19,10 @@
 package org.apache.flink.api.table.plan.rules
 
 import org.apache.calcite.rel.rules._
-import org.apache.calcite.rel.stream.StreamRules
 import org.apache.calcite.tools.{RuleSets, RuleSet}
 import org.apache.flink.api.table.plan.rules.dataSet._
 import org.apache.flink.api.table.plan.rules.datastream._
 import org.apache.flink.api.table.plan.rules.datastream.{DataStreamCalcRule, 
DataStreamScanRule, DataStreamUnionRule}
-import scala.collection.JavaConversions._
 
 object FlinkRuleSets {
 
@@ -102,6 +100,7 @@ object FlinkRuleSets {
     DataSetJoinRule.INSTANCE,
     DataSetScanRule.INSTANCE,
     DataSetUnionRule.INSTANCE,
+    DataSetIntersectRule.INSTANCE,
     DataSetSortRule.INSTANCE,
     DataSetValuesRule.INSTANCE,
     BatchTableSourceScanRule.INSTANCE

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
new file mode 100644
index 0000000..f86ec9b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalIntersect
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetIntersect, 
DataSetConvention}
+
+class DataSetIntersectRule
+  extends ConverterRule(
+    classOf[LogicalIntersect],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetIntersectRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), 
DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), 
DataSetConvention.INSTANCE)
+
+    new DataSetIntersect(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType,
+      intersect.all,
+      description)
+  }
+}
+
+object DataSetIntersectRule {
+  val INSTANCE: RelOptRule = new DataSetIntersectRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
new file mode 100644
index 0000000..c39c497
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.util.Collector
+
+
+class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, 
T]{
+  override def coGroup(first: JIterable[T], second: JIterable[T], out: 
Collector[T]): Unit = {
+    if (first == null || second == null) return
+    val leftIter = first.iterator()
+    val rightIter = second.iterator()
+    if (all) {
+      while (leftIter.hasNext && rightIter.hasNext) {
+        out.collect(leftIter.next)
+        rightIter.next
+      }
+    } else {
+      if (leftIter.hasNext && rightIter.hasNext) {
+        out.collect(leftIter.next)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index e415238..5c4cdf0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -403,7 +403,7 @@ class Table(
   }
 
   /**
-    * Union two [[Table]]s with duplicate records removed.
+    * Unions two [[Table]]s with duplicate records removed.
     * Similar to an SQL UNION. The fields of the two union operations must 
fully overlap.
     *
     * Note: Both tables must be bound to the same [[TableEnvironment]].
@@ -426,7 +426,7 @@ class Table(
   }
 
   /**
-    * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two 
union operations
+    * Unions two [[Table]]s. Similar to an SQL UNION ALL. The fields of the 
two union operations
     * must fully overlap.
     *
     * Note: Both tables must be bound to the same [[TableEnvironment]].
@@ -446,6 +446,58 @@ class Table(
   }
 
   /**
+    * Intersects two [[Table]]s with duplicate records removed. Intersect 
returns records that
+    * exist in both tables. If a record is present in one or both tables more 
than once, it is
+    * returned just once, i.e., the resulting table has no duplicate records. 
Similar to an
+    * SQL INTERSECT. The fields of the two intersect operations must fully 
overlap.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.intersect(right)
+    * }}}
+    */
+  def intersect(right: Table): Table = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Intersect on stream tables is currently not 
supported.")
+    }
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException(
+        "Only tables from the same TableEnvironment can be intersected.")
+    }
+    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = 
false).validate(tableEnv))
+  }
+
+  /**
+    * Intersects two [[Table]]s. IntersectAll returns records that exist in 
both tables.
+    * If a record is present in both tables more than once, it is returned as 
many times as it
+    * is present in both tables, i.e., the resulting table might have 
duplicate records. Similar
+    * to an SQL INTERSECT ALL. The fields of the two intersect operations must 
fully overlap.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.intersectAll(right)
+    * }}}
+    */
+  def intersectAll(right: Table): Table = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Intersect on stream tables is currently not 
supported.")
+    }
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException(
+        "Only tables from the same TableEnvironment can be intersected.")
+    }
+    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = 
true).validate(tableEnv))
+  }
+
+  /**
     * Sorts the given [[Table]]. Similar to SQL ORDER BY.
     * The resulting Table is globally sorted across all parallel partitions.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
new file mode 100644
index 0000000..10ada9d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+@RunWith(classOf[Parameterized])
+class SetOperatorsITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testUnionAll(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + 
"Hello world\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUnion(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUnionWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM (" +
+      "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
+      "WHERE b < 2"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n" + "Hallo\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUnionWithAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT count(c) FROM (" +
+      "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "18"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testIntersect(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM t1 INTERSECT SELECT c FROM t2"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world!"))
+    val ds2 = env.fromCollection(Random.shuffle(data))
+
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n" + "Hello\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Ignore
+  // calcite sql parser doesn't support INTERSECT ALL
+  def testIntersectAll(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM t1 INTERSECT ALL SELECT c FROM t2"
+
+    val data1 = new mutable.MutableList[Int]
+    data1 += (1, 1, 1, 2, 2)
+    val data2 = new mutable.MutableList[Int]
+    data2 += (1, 2, 2, 3)
+    val ds1 = env.fromCollection(data1)
+    val ds2 = env.fromCollection(data2)
+
+    tEnv.registerDataSet("t1", ds1, 'c)
+    tEnv.registerDataSet("t2", ds2, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1\n2\n2"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testIntersectWithFilter(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM ((SELECT * FROM t1) INTERSECT (SELECT * FROM 
t2)) WHERE a > 1"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get3TupleDataSet(env)
+
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hello\n" + "Hello world\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
deleted file mode 100644
index 527eac7..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class UnionITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testUnionAll(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)"
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
-    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
-    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + 
"Hello world\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnion(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)"
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
-    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
-    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnionWithFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT c FROM (" +
-      "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
-      "WHERE b < 2"
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
-    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "Hi\n" + "Hallo\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnionWithAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT count(c) FROM (" +
-      "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
-    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "18"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
new file mode 100644
index 0000000..0c7a09c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+@RunWith(classOf[Parameterized])
+class SetOperatorsITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testUnionAll(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+
+    val unionDs = ds1.unionAll(ds2).select('c)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + 
"Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUnion(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+
+    val unionDs = ds1.union(ds2).select('c)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTernaryUnionAll(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+
+    val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+      "Hi\n" + "Hello\n" + "Hello world\n" +
+      "Hi\n" + "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTernaryUnion(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+
+    val unionDs = ds1.union(ds2).union(ds3).select('c)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentFieldNames(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'd, 'c, 'e)
+
+    // must fail. Union inputs have different field names.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Union inputs have different field types.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 
'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2).select('c)
+  }
+
+  @Test
+  def testIntersect(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world!"))
+    val ds2 = env.fromCollection(Random.shuffle(data)).toTable(tEnv, 'a, 'b, 
'c)
+
+    val intersectDS = ds1.intersect(ds2).select('c).toDataSet[Row]
+
+    val results = intersectDS.collect()
+
+    val expected = "Hi\n" + "Hello\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testIntersectAll(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data1 = new mutable.MutableList[Int]
+    data1 += (1, 1, 1, 2, 2)
+    val data2 = new mutable.MutableList[Int]
+    data2 += (1, 2, 2, 2, 3)
+    val ds1 = env.fromCollection(data1).toTable(tEnv, 'c)
+    val ds2 = env.fromCollection(data2).toTable(tEnv, 'c)
+
+    val intersectDS = ds1.intersectAll(ds2).select('c).toDataSet[Row]
+
+    val expected = "1\n2\n2"
+    val results = intersectDS.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testIntersectWithDifferentFieldNames(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'e, 'f, 
'g)
+
+    val intersectDs = ds1.intersect(ds2).select('c)
+
+    val results = intersectDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectWithDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Intersect inputs have different field types.
+    ds1.intersect(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 
'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.intersect(ds2).select('c)
+  }
+
+  @Test
+  def testIntersectWithScalarExpression(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+      .select('a + 1, 'b, 'c)
+    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
+      .select('a + 1, 'b, 'c)
+
+    val intersectDs = ds1.intersect(ds2)
+
+    val results = intersectDs.toDataSet[Row].collect()
+    val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/815bc833/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
deleted file mode 100644
index f472341..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException, 
ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class UnionITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testUnionAll(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + 
"Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnion(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-
-    val unionDs = ds1.union(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnionAll(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnion(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-
-    val unionDs = ds1.union(ds2).union(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'd, 'c, 'e)
-
-    // must fail. Union inputs have different field names.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Union inputs have different field types.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 
'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2).select('c)
-  }
-}

Reply via email to