[FLINK-5143] [table] Add EXISTS to list of supported SQL operators. This closes #2853.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06d252e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06d252e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06d252e8 Branch: refs/heads/master Commit: 06d252e89b58c5947b331cf24a552d11ff8767e8 Parents: 7d91b9e Author: twalthr <[email protected]> Authored: Wed Nov 23 10:44:20 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Wed Nov 23 18:35:44 2016 +0100 ---------------------------------------------------------------------- docs/dev/table_api.md | 18 ++--- .../table/plan/nodes/dataset/DataSetJoin.scala | 2 +- .../api/table/validate/FunctionCatalog.scala | 3 +- .../api/scala/batch/sql/SetOperatorsTest.scala | 75 ++++++++++++++++++++ .../flink/api/table/utils/TableTestBase.scala | 26 +++++++ 5 files changed, 114 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index bb0e500..4d8c953 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -2800,39 +2800,41 @@ value NOT IN (value [, value]* ) <p>Whether <i>value</i> is not equal to every value in a list.</p> </td> </tr> -<!-- NOT SUPPORTED SO FAR + <tr> <td> {% highlight text %} -value IN (sub-query) +EXISTS (sub-query) {% endhighlight %} </td> <td> - <p>Whether <i>value</i> is equal to a row returned by sub-query.</p> + <p>Whether <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p> </td> </tr> +<!-- NOT SUPPORTED SO FAR <tr> <td> {% highlight text %} -value NOT IN (sub-query) +value IN (sub-query) {% endhighlight %} </td> <td> - <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p> + <p>Whether <i>value</i> is equal to a row returned by sub-query.</p> </td> </tr> <tr> <td> {% highlight text %} -EXISTS (sub-query) +value NOT IN (sub-query) {% endhighlight %} </td> <td> - <p>Whether sub-query returns at least one row.</p> + <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p> </td> - </tr>--> + </tr> + --> </tbody> </table> http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index bbb6325..6d7a30e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -215,7 +215,7 @@ class DataSetJoin( } private def joinTypeToString = joinType match { - case JoinRelType.INNER => "Join" + case JoinRelType.INNER => "InnerJoin" case JoinRelType.LEFT=> "LeftOuterJoin" case JoinRelType.RIGHT => "RightOuterJoin" case JoinRelType.FULL => "FullOuterJoin" http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala index 68e2f97..679733c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -281,7 +281,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.CAST, SqlStdOperatorTable.EXTRACT, SqlStdOperatorTable.QUARTER, - SqlStdOperatorTable.SCALAR_QUERY + SqlStdOperatorTable.SCALAR_QUERY, + SqlStdOperatorTable.EXISTS ) builtInSqlOperators.foreach(register) http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala new file mode 100644 index 0000000..5bc6e4a --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.utils.TableTestBase +import org.apache.flink.api.table.utils.TableTestUtil._ +import org.junit.Test + +class SetOperatorsTest extends TableTestBase { + + @Test + def testExists(): Unit = { + val util = batchTestUtil() + util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string) + util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string) + + val expected = unaryNode( + "DataSetCalc", + binaryNode( + "DataSetJoin", + batchTableNode(0), + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + binaryNode( + "DataSetJoin", + batchTableNode(1), + unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "a_long"), + term("select", "a_long") + ), + term("where", "=(a_long, b_long)"), + term("join", "b_long", "b_int", "b_string", "a_long"), + term("joinType", "InnerJoin") + ), + term("select", "a_long", "true AS $f0") + ), + term("groupBy", "a_long"), + term("select", "a_long", "MIN($f0) AS $f1") + ), + term("where", "=(a_long, a_long0)"), + term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"), + term("joinType", "InnerJoin") + ), + term("select", "a_int", "a_string") + ) + + util.verifySql( + "SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)", + expected + ) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala index ce693ff..2ea15a0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala @@ -56,6 +56,10 @@ abstract class TableTestUtil { def addTable[T: TypeInformation](name: String, fields: Expression*): Table def verifySql(query: String, expected: String): Unit def verifyTable(resultTable: Table, expected: String): Unit + + // the print methods are for debugging purposes only + def printTable(resultTable: Table): Unit + def printSql(query: String): Unit } object TableTestUtil { @@ -87,6 +91,7 @@ object TableTestUtil { def streamTableNode(idx: Int): String = { s"DataStreamScan(table=[[_DataStreamTable_$idx]])" } + } case class BatchTableTestUtil() extends TableTestUtil { @@ -121,6 +126,16 @@ case class BatchTableTestUtil() extends TableTestUtil { expected.split("\n").map(_.trim).mkString("\n"), actual.split("\n").map(_.trim).mkString("\n")) } + + def printTable(resultTable: Table): Unit = { + val relNode = resultTable.getRelNode + val optimized = tEnv.optimize(relNode) + println(RelOptUtil.toString(optimized)) + } + + def printSql(query: String): Unit = { + printTable(tEnv.sql(query)) + } } case class StreamTableTestUtil() extends TableTestUtil { @@ -156,4 +171,15 @@ case class StreamTableTestUtil() extends TableTestUtil { expected.split("\n").map(_.trim).mkString("\n"), actual.split("\n").map(_.trim).mkString("\n")) } + + // the print methods are for debugging purposes only + def printTable(resultTable: Table): Unit = { + val relNode = resultTable.getRelNode + val optimized = tEnv.optimize(relNode) + println(RelOptUtil.toString(optimized)) + } + + def printSql(query: String): Unit = { + printTable(tEnv.sql(query)) + } }
