Repository: flink Updated Branches: refs/heads/master 808e0f9a6 -> 479be9d88
[FLINK-7854] [table] Reject lateral table outer joins with predicates in SQL. This closes #4846. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/479be9d8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/479be9d8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/479be9d8 Branch: refs/heads/master Commit: 479be9d888f9a338cac908d50eb753843f2aad4b Parents: 808e0f9 Author: Xingcan Cui <xingc...@gmail.com> Authored: Mon Oct 16 16:49:14 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Oct 19 17:19:19 2017 +0200 ---------------------------------------------------------------------- docs/dev/table/sql.md | 3 +- .../calcite/FlinkCalciteSqlValidator.scala | 29 ++++++++++++- .../table/api/batch/sql/CorrelateTest.scala | 42 +++++++++++++++++- .../validation/CorrelateValidationTest.scala | 45 ++++++++++++++++++++ .../table/api/stream/sql/CorrelateTest.scala | 42 +++++++++++++++++- .../validation/CorrelateValidationTest.scala | 45 ++++++++++++++++++++ 6 files changed, 202 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 8591910..8a86f72 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -435,11 +435,12 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) </tr> <tr> <td> - <strong>User Defined Table Functions (UDTF)</strong><br> + <strong>Join with User Defined Table Functions (UDTF)</strong><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> </td> <td> <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p> + <p><b>Note:</b> Currently only literal <code>TRUE</code> can be accepted as the predicate for the left outer join against a lateral table.</p> {% highlight sql %} SELECT users, tag FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala index 2bdf360..137a199 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala @@ -22,7 +22,8 @@ import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql._ -import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl} +import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl, SqlValidatorScope} +import org.apache.flink.table.api.ValidationException /** * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]]. @@ -48,4 +49,30 @@ class FlinkCalciteSqlValidator( insert: SqlInsert): RelDataType = { typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType) } + + override def validateJoin(join: SqlJoin, scope: SqlValidatorScope): Unit = { + // Due to the improper translation of lateral table left outer join in Calcite, we need to + // temporarily forbid the common predicates until the problem is fixed (see FLINK-7865). + if (join.getJoinType == JoinType.LEFT && + isCollectionTable(join.getRight)) { + join.getCondition match { + case c: SqlLiteral if c.booleanValue() && c.getValue.asInstanceOf[Boolean] => + // We accept only literal true + case c if null != c => + throw new ValidationException( + s"Left outer joins with a table function do not accept a predicte such as $c. " + + s"Only literal TRUE is accepted.") + } + } + super.validateJoin(join, scope) + } + + private def isCollectionTable(node: SqlNode): Boolean = { + // TABLE (`func`(`foo`)) AS bar + node match { + case n: SqlCall if n.getKind == SqlKind.AS => + n.getOperandList.get(0).getKind == SqlKind.COLLECTION_TABLE + case _ => false + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala index 6942a4e..7330825 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala @@ -76,7 +76,7 @@ class CorrelateTest extends TableTestBase { } @Test - def testLeftOuterJoin(): Unit = { + def testLeftOuterJoinWithLiteralTrue(): Unit = { val util = batchTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) @@ -103,6 +103,46 @@ class CorrelateTest extends TableTestBase { } @Test + def testLeftOuterJoinAsSubQuery(): Unit = { + val util = batchTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2) + util.addFunction("func1", func1) + + val sqlQuery = + """ + | SELECT * + | FROM MyTable2 LEFT OUTER JOIN + | (SELECT c, s + | FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true) + | ON c2 = s """.stripMargin + + val expected = binaryNode( + "DataSetJoin", + batchTableNode(1), + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetCorrelate", + batchTableNode(0), + term("invocation", "func1($cor0.c)"), + term("correlate", "table(func1($cor0.c))"), + term("select", "a", "b", "c", "f0"), + term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), + term("joinType","LEFT") + ), + term("select", "c", "f0 AS s") + ), + term("where", "=(c2, s)"), + term("join", "a2", "b2", "c2", "c", "s"), + term("joinType", "LeftOuterJoin") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test def testCustomType(): Unit = { val util = batchTestUtil() val func2 = new TableFunc2 http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala new file mode 100644 index 0000000..81381f3 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.batch.sql.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.{TableFunc1, TableTestBase} +import org.junit.Test + +class CorrelateValidationTest extends TableTestBase{ + + /** + * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the + * join predicate can only be empty or literal true (the restriction should be removed in + * FLINK-7865). + */ + @Test(expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { + val util = batchTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addFunction("func1", func1) + + val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON c = s" + + util.verifySql(sqlQuery, "n/a") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala index ec61816..f243158 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala @@ -76,7 +76,7 @@ class CorrelateTest extends TableTestBase { } @Test - def testLeftOuterJoin(): Unit = { + def testLeftOuterJoinWithLiteralTrue(): Unit = { val util = streamTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) @@ -103,6 +103,46 @@ class CorrelateTest extends TableTestBase { } @Test + def testLeftOuterJoinAsSubQuery(): Unit = { + val util = batchTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2) + util.addFunction("func1", func1) + + val sqlQuery = + """ + | SELECT * + | FROM MyTable2 LEFT OUTER JOIN + | (SELECT c, s + | FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true) + | ON c2 = s """.stripMargin + + val expected = binaryNode( + "DataSetJoin", + batchTableNode(1), + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetCorrelate", + batchTableNode(0), + term("invocation", "func1($cor0.c)"), + term("correlate", "table(func1($cor0.c))"), + term("select", "a", "b", "c", "f0"), + term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), + term("joinType","LEFT") + ), + term("select", "c", "f0 AS s") + ), + term("where", "=(c2, s)"), + term("join", "a2", "b2", "c2", "c", "s"), + term("joinType", "LeftOuterJoin") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test def testCustomType(): Unit = { val util = streamTestUtil() val func2 = new TableFunc2 http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala new file mode 100644 index 0000000..926236e --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.utils.{TableFunc1, TableTestBase} +import org.junit.Test + +class CorrelateValidationTest extends TableTestBase{ + + /** + * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the + * join predicate can only be empty or literal true (the restriction should be removed in + * FLINK-7865). + */ + @Test(expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { + val util = streamTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addFunction("func1", func1) + + val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON c = s" + + util.verifySql(sqlQuery, "n/a") + } +}