Repository: flink Updated Branches: refs/heads/release-1.3 024d8f577 -> 1d10cee91
[FLINK-7854] [table] Reject lateral table outer joins with predicates in SQL. This closes #4846. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d10cee9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d10cee9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d10cee9 Branch: refs/heads/release-1.3 Commit: 1d10cee91a12734de09b8c9665437309242e2dd5 Parents: 024d8f5 Author: Xingcan Cui <xingc...@gmail.com> Authored: Mon Oct 16 16:49:14 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Oct 19 19:30:05 2017 +0200 ---------------------------------------------------------------------- docs/dev/table/sql.md | 3 +- .../calcite/FlinkCalciteSqlValidator.scala | 29 +++++++++- .../sql/UserDefinedTableFunctionTest.scala | 60 +++++++++++++++++++- .../sql/UserDefinedTableFunctionTest.scala | 60 +++++++++++++++++++- 4 files changed, 148 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 26f4f1b..8a60694 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -381,11 +381,12 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) </tr> <tr> <td> - <strong>User Defined Table Functions (UDTF)</strong><br> + <strong>Join with User Defined Table Functions (UDTF)</strong><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> </td> <td> <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p> + <p><b>Note:</b> Currently only literal <code>TRUE</code> can be accepted as the predicate for the left outer join against a lateral table.</p> {% highlight sql %} SELECT users, tag FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala index 2bdf360..137a199 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala @@ -22,7 +22,8 @@ import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql._ -import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl} +import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl, SqlValidatorScope} +import org.apache.flink.table.api.ValidationException /** * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]]. @@ -48,4 +49,30 @@ class FlinkCalciteSqlValidator( insert: SqlInsert): RelDataType = { typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType) } + + override def validateJoin(join: SqlJoin, scope: SqlValidatorScope): Unit = { + // Due to the improper translation of lateral table left outer join in Calcite, we need to + // temporarily forbid the common predicates until the problem is fixed (see FLINK-7865). + if (join.getJoinType == JoinType.LEFT && + isCollectionTable(join.getRight)) { + join.getCondition match { + case c: SqlLiteral if c.booleanValue() && c.getValue.asInstanceOf[Boolean] => + // We accept only literal true + case c if null != c => + throw new ValidationException( + s"Left outer joins with a table function do not accept a predicte such as $c. " + + s"Only literal TRUE is accepted.") + } + } + super.validateJoin(join, scope) + } + + private def isCollectionTable(node: SqlNode): Boolean = { + // TABLE (`func`(`foo`)) AS bar + node match { + case n: SqlCall if n.getKind == SqlKind.AS => + n.getOperandList.get(0).getKind == SqlKind.COLLECTION_TABLE + case _ => false + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala index e091da2..3f7231e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.api.scala.batch.sql import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2} import org.apache.flink.table.utils._ @@ -74,7 +75,7 @@ class UserDefinedTableFunctionTest extends TableTestBase { } @Test - def testLeftOuterJoin(): Unit = { + def testLeftOuterJoinWithLiteralTrue(): Unit = { val util = batchTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) @@ -100,6 +101,46 @@ class UserDefinedTableFunctionTest extends TableTestBase { } @Test + def testLeftOuterJoinAsSubQuery(): Unit = { + val util = batchTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2) + util.addFunction("func1", func1) + + val sqlQuery = + """ + | SELECT * + | FROM MyTable2 LEFT OUTER JOIN + | (SELECT c, s + | FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true) + | ON c2 = s """.stripMargin + + val expected = binaryNode( + "DataSetJoin", + batchTableNode(1), + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetCorrelate", + batchTableNode(0), + term("invocation", "func1($cor0.c)"), + term("function", func1.getClass.getCanonicalName), + term("rowType", + "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"), + term("joinType","LEFT") + ), + term("select", "c", "f0 AS s") + ), + term("where", "=(c2, s)"), + term("join", "a2", "b2", "c2", "c", "s"), + term("joinType", "LeftOuterJoin") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test def testCustomType(): Unit = { val util = batchTestUtil() val func2 = new TableFunc2 @@ -235,4 +276,21 @@ class UserDefinedTableFunctionTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + + /** + * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the + * join predicate can only be empty or literal true (the restriction should be removed in + * FLINK-7865). + */ + @Test(expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { + val util = batchTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addFunction("func1", func1) + + val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON c = s" + + util.verifySql(sqlQuery, "n/a") + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala index 58eedd0..ae71749 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.api.scala.stream.sql import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2} import org.apache.flink.table.utils._ @@ -74,7 +75,7 @@ class UserDefinedTableFunctionTest extends TableTestBase { } @Test - def testLeftOuterJoin(): Unit = { + def testLeftOuterJoinWithLiteralTrue(): Unit = { val util = streamTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) @@ -100,6 +101,46 @@ class UserDefinedTableFunctionTest extends TableTestBase { } @Test + def testLeftOuterJoinAsSubQuery(): Unit = { + val util = batchTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2) + util.addFunction("func1", func1) + + val sqlQuery = + """ + | SELECT * + | FROM MyTable2 LEFT OUTER JOIN + | (SELECT c, s + | FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true) + | ON c2 = s """.stripMargin + + val expected = binaryNode( + "DataSetJoin", + batchTableNode(1), + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetCorrelate", + batchTableNode(0), + term("invocation", "func1($cor0.c)"), + term("function", func1.getClass.getCanonicalName), + term("rowType", + "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"), + term("joinType","LEFT") + ), + term("select", "c", "f0 AS s") + ), + term("where", "=(c2, s)"), + term("join", "a2", "b2", "c2", "c", "s"), + term("joinType", "LeftOuterJoin") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test def testCustomType(): Unit = { val util = streamTestUtil() val func2 = new TableFunc2 @@ -234,4 +275,21 @@ class UserDefinedTableFunctionTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + + /** + * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the + * join predicate can only be empty or literal true (the restriction should be removed in + * FLINK-7865). + */ + @Test(expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { + val util = streamTestUtil() + val func1 = new TableFunc1 + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.addFunction("func1", func1) + + val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON c = s" + + util.verifySql(sqlQuery, "n/a") + } }