Repository: flink
Updated Branches:
  refs/heads/master 808e0f9a6 -> 479be9d88


[FLINK-7854] [table] Reject lateral table outer joins with predicates in SQL.

This closes #4846.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/479be9d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/479be9d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/479be9d8

Branch: refs/heads/master
Commit: 479be9d888f9a338cac908d50eb753843f2aad4b
Parents: 808e0f9
Author: Xingcan Cui <xingc...@gmail.com>
Authored: Mon Oct 16 16:49:14 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Oct 19 17:19:19 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  3 +-
 .../calcite/FlinkCalciteSqlValidator.scala      | 29 ++++++++++++-
 .../table/api/batch/sql/CorrelateTest.scala     | 42 +++++++++++++++++-
 .../validation/CorrelateValidationTest.scala    | 45 ++++++++++++++++++++
 .../table/api/stream/sql/CorrelateTest.scala    | 42 +++++++++++++++++-
 .../validation/CorrelateValidationTest.scala    | 45 ++++++++++++++++++++
 6 files changed, 202 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 8591910..8a86f72 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -435,11 +435,12 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
     </tr>
     <tr>
        <td>
-        <strong>User Defined Table Functions (UDTF)</strong><br>
+        <strong>Join with User Defined Table Functions (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
        <td>
       <p>UDTFs must be registered in the TableEnvironment. See the <a 
href="udfs.html">UDF documentation</a> for details on how to specify and 
register UDTFs. </p>
+      <p><b>Note:</b> Currently only literal <code>TRUE</code> can be accepted 
as the predicate for the left outer join against a lateral table.</p>
 {% highlight sql %}
 SELECT users, tag
 FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag

http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
index 2bdf360..137a199 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.adapter.java.JavaTypeFactory
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql._
-import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl}
+import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl, 
SqlValidatorScope}
+import org.apache.flink.table.api.ValidationException
 
 /**
  * This is a copy of Calcite's CalciteSqlValidator to use with 
[[FlinkPlannerImpl]].
@@ -48,4 +49,30 @@ class FlinkCalciteSqlValidator(
       insert: SqlInsert): RelDataType = {
     typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
   }
+
+  override def validateJoin(join: SqlJoin, scope: SqlValidatorScope): Unit = {
+    // Due to the improper translation of lateral table left outer join in 
Calcite, we need to
+    // temporarily forbid the common predicates until the problem is fixed 
(see FLINK-7865).
+    if (join.getJoinType == JoinType.LEFT &&
+      isCollectionTable(join.getRight)) {
+      join.getCondition match {
+        case c: SqlLiteral if c.booleanValue() && 
c.getValue.asInstanceOf[Boolean] =>
+          // We accept only literal true
+        case c if null != c =>
+          throw new ValidationException(
+            s"Left outer joins with a table function do not accept a predicte 
such as $c. " +
+              s"Only literal TRUE is accepted.")
+      }
+    }
+    super.validateJoin(join, scope)
+  }
+
+  private def isCollectionTable(node: SqlNode): Boolean = {
+    // TABLE (`func`(`foo`)) AS bar
+    node match {
+      case n: SqlCall if n.getKind == SqlKind.AS =>
+        n.getOperandList.get(0).getKind == SqlKind.COLLECTION_TABLE
+      case _ => false
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
index 6942a4e..7330825 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
@@ -76,7 +76,7 @@ class CorrelateTest extends TableTestBase {
   }
 
   @Test
-  def testLeftOuterJoin(): Unit = {
+  def testLeftOuterJoinWithLiteralTrue(): Unit = {
     val util = batchTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -103,6 +103,46 @@ class CorrelateTest extends TableTestBase {
   }
 
   @Test
+  def testLeftOuterJoinAsSubQuery(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
+    util.addFunction("func1", func1)
+
+    val sqlQuery =
+    """
+      | SELECT *
+      | FROM MyTable2 LEFT OUTER JOIN
+      |  (SELECT c, s
+      |   FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true)
+      | ON c2 = s """.stripMargin
+
+    val expected = binaryNode(
+      "DataSetJoin",
+      batchTableNode(1),
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+         "DataSetCorrelate",
+          batchTableNode(0),
+          term("invocation", "func1($cor0.c)"),
+          term("correlate", "table(func1($cor0.c))"),
+          term("select", "a", "b", "c", "f0"),
+          term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, 
VARCHAR(65536) f0)"),
+          term("joinType","LEFT")
+        ),
+        term("select", "c", "f0 AS s")
+      ),
+      term("where", "=(c2, s)"),
+      term("join", "a2", "b2", "c2", "c", "s"),
+      term("joinType", "LeftOuterJoin")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testCustomType(): Unit = {
     val util = batchTestUtil()
     val func2 = new TableFunc2

http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala
new file mode 100644
index 0000000..81381f3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CorrelateValidationTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
+import org.junit.Test
+
+class CorrelateValidationTest extends TableTestBase{
+
+  /**
+    * Due to the improper translation of TableFunction left outer join (see 
CALCITE-2004), the
+    * join predicate can only be empty or literal true (the restriction should 
be removed in
+    * FLINK-7865).
+    */
+  @Test(expected = classOf[ValidationException])
+  def testLeftOuterJoinWithPredicates(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) 
AS T(s) ON c = s"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index ec61816..f243158 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -76,7 +76,7 @@ class CorrelateTest extends TableTestBase {
   }
 
   @Test
-  def testLeftOuterJoin(): Unit = {
+  def testLeftOuterJoinWithLiteralTrue(): Unit = {
     val util = streamTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -103,6 +103,46 @@ class CorrelateTest extends TableTestBase {
   }
 
   @Test
+  def testLeftOuterJoinAsSubQuery(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
+    util.addFunction("func1", func1)
+
+    val sqlQuery =
+      """
+        | SELECT *
+        | FROM MyTable2 LEFT OUTER JOIN
+        |  (SELECT c, s
+        |   FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on 
true)
+        | ON c2 = s """.stripMargin
+
+    val expected = binaryNode(
+      "DataSetJoin",
+      batchTableNode(1),
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetCorrelate",
+          batchTableNode(0),
+          term("invocation", "func1($cor0.c)"),
+          term("correlate", "table(func1($cor0.c))"),
+          term("select", "a", "b", "c", "f0"),
+          term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, 
VARCHAR(65536) f0)"),
+          term("joinType","LEFT")
+        ),
+        term("select", "c", "f0 AS s")
+      ),
+      term("where", "=(c2, s)"),
+      term("join", "a2", "b2", "c2", "c", "s"),
+      term("joinType", "LeftOuterJoin")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testCustomType(): Unit = {
     val util = streamTestUtil()
     val func2 = new TableFunc2

http://git-wip-us.apache.org/repos/asf/flink/blob/479be9d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala
new file mode 100644
index 0000000..926236e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/CorrelateValidationTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
+import org.junit.Test
+
+class CorrelateValidationTest extends TableTestBase{
+
+  /**
+    * Due to the improper translation of TableFunction left outer join (see 
CALCITE-2004), the
+    * join predicate can only be empty or literal true (the restriction should 
be removed in
+    * FLINK-7865).
+    */
+  @Test(expected = classOf[ValidationException])
+  def testLeftOuterJoinWithPredicates(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) 
AS T(s) ON c = s"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+}

Reply via email to